This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5f01571f4c6 Pipe: tree/table model isolation for
alter/start/stop/drop/show pipe operations & support 'mode.double-living'
(#14386)
5f01571f4c6 is described below
commit 5f01571f4c64033c460545ff885f320433ff8bfc
Author: VGalaxies <[email protected]>
AuthorDate: Thu Jan 9 19:18:54 2025 +0800
Pipe: tree/table model isolation for alter/start/stop/drop/show pipe
operations & support 'mode.double-living' (#14386)
---
.../iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java | 3 +-
.../it/autocreate/IoTDBPipeSwitchStatusIT.java | 23 +-
.../iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java | 30 +-
.../it/tablemodel/IoTDBPipeDoubleLivingIT.java | 329 ++++++++++++++++++
.../pipe/it/tablemodel/IoTDBPipeIsolationIT.java | 377 +++++++++++++++++++++
.../it/tablemodel/IoTDBPipeSwitchStatusIT.java | 23 +-
.../pipe/it/tablemodel/IoTDBPipeSyntaxIT.java | 33 +-
.../iotdb/pipe/it/tablemodel/TableModelUtils.java | 17 +
.../response/pipe/task/PipeTableResp.java | 7 +
.../iotdb/confignode/manager/ConfigManager.java | 10 +-
.../apache/iotdb/confignode/manager/IManager.java | 23 +-
.../pipe/coordinator/task/PipeTaskCoordinator.java | 74 ++--
.../confignode/persistence/pipe/PipeTaskInfo.java | 21 +-
.../impl/pipe/task/AlterPipeProcedureV2.java | 4 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 16 +-
.../dataregion/IoTDBDataRegionExtractor.java | 20 +-
.../iotdb/db/protocol/client/ConfigNodeClient.java | 14 +
.../config/executor/ClusterConfigTaskExecutor.java | 20 +-
.../execution/config/sys/pipe/AlterPipeTask.java | 8 +-
.../execution/config/sys/pipe/DropPipeTask.java | 7 +-
.../execution/config/sys/pipe/ShowPipeTask.java | 3 +-
.../execution/config/sys/pipe/StartPipeTask.java | 7 +-
.../execution/config/sys/pipe/StopPipeTask.java | 7 +-
.../plan/relational/sql/ast/AlterPipe.java | 22 +-
.../plan/relational/sql/ast/DropPipe.java | 10 +-
.../plan/relational/sql/ast/ShowPipes.java | 10 +-
.../plan/relational/sql/ast/StartPipe.java | 8 +-
.../plan/relational/sql/ast/StopPipe.java | 8 +-
.../metadata/pipe/AlterPipeStatement.java | 31 +-
.../statement/metadata/pipe/DropPipeStatement.java | 23 +-
.../metadata/pipe/ShowPipesStatement.java | 18 +-
.../metadata/pipe/StartPipeStatement.java | 17 +-
.../statement/metadata/pipe/StopPipeStatement.java | 17 +-
.../commons/pipe/agent/task/meta/PipeMeta.java | 40 +++
.../pipe/agent/task/meta/PipeMetaKeeper.java | 8 +
.../config/constant/PipeExtractorConstant.java | 3 +
.../pipe/datastructure/pattern/TablePattern.java | 21 +-
.../pipe/datastructure/pattern/TreePattern.java | 22 +-
.../commons/pipe/extractor/IoTDBExtractor.java | 65 +++-
.../src/main/thrift/confignode.thrift | 19 ++
40 files changed, 1239 insertions(+), 179 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
index 66bae32c5b1..a2f5c65cb73 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
-import org.apache.iotdb.itbase.env.BaseEnv;
import org.junit.Assert;
import org.junit.Test;
@@ -106,7 +105,7 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
}
// Alter pipe (modify)
- try (final Connection connection =
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute("alter pipe a2b modify source
('source.pattern'='root.test2')");
} catch (SQLException e) {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSwitchStatusIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSwitchStatusIT.java
index cdcc41ad53f..03e52a982e6 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSwitchStatusIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSwitchStatusIT.java
@@ -266,11 +266,14 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualAutoIT {
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
client.startPipe("").getCode());
Assert.assertEquals(
- TSStatusCode.PIPE_ERROR.getStatusCode(),
client.startPipe("p0").getCode());
- Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
client.startPipe("p").getCode());
- Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
client.startPipe("*").getCode());
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.startPipe("").getCode());
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.startPipe("p0").getCode());
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.startPipe("p").getCode());
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.startPipe("*").getCode());
List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
@@ -281,10 +284,14 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualAutoIT {
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
- Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
client.stopPipe("").getCode());
- Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
client.stopPipe("p0").getCode());
- Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
client.stopPipe("p").getCode());
- Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
client.stopPipe("*").getCode());
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.stopPipe("").getCode());
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.stopPipe("p0").getCode());
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.stopPipe("p").getCode());
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.stopPipe("*").getCode());
showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java
index a81b02f7dd7..1665f9d2ce2 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java
@@ -65,7 +65,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelTestIT {
long lastCreationTime;
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
Assert.assertEquals(1, showPipeResult.size());
// Check status
Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
@@ -96,7 +97,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelTestIT {
// Show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
Assert.assertEquals(1, showPipeResult.size());
// Check status
Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
@@ -114,7 +116,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelTestIT {
// Show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
Assert.assertEquals(1, showPipeResult.size());
// Check status
Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
@@ -149,7 +152,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelTestIT {
// Show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
Assert.assertEquals(1, showPipeResult.size());
// check status
Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
@@ -183,7 +187,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelTestIT {
// Show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
Assert.assertEquals(1, showPipeResult.size());
// Check status
Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
@@ -225,7 +230,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelTestIT {
// Show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
Assert.assertEquals(1, showPipeResult.size());
// Check status
Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
@@ -257,7 +263,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelTestIT {
// show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
Assert.assertEquals(1, showPipeResult.size());
// check status
Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
@@ -289,7 +296,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelTestIT {
// show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
Assert.assertEquals(1, showPipeResult.size());
// check status
Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
@@ -323,7 +331,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelTestIT {
// show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
Assert.assertEquals(1, showPipeResult.size());
// check status
Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
@@ -354,7 +363,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelTestIT {
// Show pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
Assert.assertEquals(1, showPipeResult.size());
// Check status
Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDoubleLivingIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDoubleLivingIT.java
new file mode 100644
index 00000000000..f6573728cf2
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDoubleLivingIT.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.tablemodel;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2TableModel;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2TableModel.class})
+public class IoTDBPipeDoubleLivingIT extends AbstractPipeTableModelTestIT {
+
+ @Test
+ public void testDoubleLivingInvalidParameter() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe %s"
+ + " with source ("
+ + "'capture.tree'='false',"
+ + "'mode.double-living'='true')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+ "p1", receiverDataNode.getIpAndPortString()));
+ fail();
+ } catch (final SQLException ignored) {
+ }
+
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe %s"
+ + " with source ("
+ + "'capture.table'='false',"
+ + "'mode.double-living'='true')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+ "p2", receiverDataNode.getIpAndPortString()));
+ fail();
+ } catch (final SQLException ignored) {
+ }
+
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe %s"
+ + " with source ("
+ + "'forwarding-pipe-requests'='true',"
+ + "'mode.double-living'='true')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+ "p3", receiverDataNode.getIpAndPortString()));
+ fail();
+ } catch (final SQLException ignored) {
+ }
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+ Assert.assertEquals(0, showPipeResult.size());
+ }
+ }
+
+ // combination of
org.apache.iotdb.pipe.it.tablemodel.IoTDBPipeLifeCycleIT.testDoubleLiving and
+ // org.apache.iotdb.pipe.it.autocreate.IoTDBPipeLifeCycleIT.testDoubleLiving
+ @Test
+ public void testBasicDoubleLiving() {
+ boolean insertResult;
+
+ final DataNodeWrapper senderDataNode = senderEnv.getDataNodeWrapper(0);
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+ final Consumer<String> handleFailure =
+ o -> {
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ };
+
+ // insertion on sender
+ for (int i = 0; i < 100; ++i) {
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i))) {
+ return;
+ }
+ }
+ TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
+ insertResult = TableModelUtils.insertData("test", "test", 0, 100,
senderEnv);
+ if (!insertResult) {
+ return;
+ }
+ if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ return;
+ }
+
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe %s"
+ + " with source ("
+ + "'database-name'='test',"
+ + "'table-name'='test',"
+ + "'path'='root.db.d1.s1',"
+ + "'mode.double-living'='true')"
+ + " with sink ("
+ + "'batch.enable'='false',"
+ + "'node-urls'='%s')",
+ "p1", receiverDataNode.getIpAndPortString()));
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // insertion on sender
+ for (int i = 100; i < 200; ++i) {
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i))) {
+ return;
+ }
+ }
+ for (int i = 200; i < 300; ++i) {
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ receiverEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i))) {
+ return;
+ }
+ }
+ insertResult = TableModelUtils.insertData("test", "test", 100, 200,
senderEnv);
+ if (!insertResult) {
+ return;
+ }
+ insertResult = TableModelUtils.insertData("test", "test", 200, 300,
receiverEnv);
+ if (!insertResult) {
+ return;
+ }
+ if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ return;
+ }
+
+ try (final Connection connection =
receiverEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe %s"
+ + " with source ("
+ + "'database-name'='test',"
+ + "'table-name'='test',"
+ + "'path'='root.db.d1.s1',"
+ + "'mode.double-living'='true')"
+ + " with sink ("
+ + "'batch.enable'='false',"
+ + "'node-urls'='%s')",
+ "p2", senderDataNode.getIpAndPortString()));
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // insertion on receiver
+ for (int i = 300; i < 400; ++i) {
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ receiverEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i))) {
+ return;
+ }
+ }
+ insertResult = TableModelUtils.insertData("test", "test", 300, 400,
receiverEnv);
+ if (!insertResult) {
+ return;
+ }
+ if (!TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, "flush")) {
+ return;
+ }
+
+ // check result
+ final Set<String> expectedResSet = new HashSet<>();
+ for (int i = 0; i < 400; ++i) {
+ expectedResSet.add(i + ",1.0,");
+ }
+ TestUtils.assertDataEventuallyOnEnv(
+ senderEnv, "select * from root.**", "Time,root.db.d1.s1,",
expectedResSet);
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,",
expectedResSet);
+ TableModelUtils.assertData("test", "test", 0, 400, senderEnv,
handleFailure);
+ TableModelUtils.assertData("test", "test", 0, 400, receiverEnv,
handleFailure);
+
+ // restart cluster
+ try {
+ TestUtils.restartCluster(senderEnv);
+ TestUtils.restartCluster(receiverEnv);
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return;
+ }
+
+ // insertion on receiver
+ for (int i = 400; i < 500; ++i) {
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ receiverEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i))) {
+ return;
+ }
+ }
+ insertResult = TableModelUtils.insertData("test", "test", 400, 500,
receiverEnv);
+ if (!insertResult) {
+ return;
+ }
+ if (!TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, "flush")) {
+ return;
+ }
+
+ // check result
+ for (int i = 400; i < 500; ++i) {
+ expectedResSet.add(i + ",1.0,");
+ }
+ TestUtils.assertDataEventuallyOnEnv(
+ senderEnv, "select * from root.**", "Time,root.db.d1.s1,",
expectedResSet);
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,",
expectedResSet);
+ TableModelUtils.assertData("test", "test", 0, 500, senderEnv,
handleFailure);
+ TableModelUtils.assertData("test", "test", 0, 500, receiverEnv,
handleFailure);
+ }
+
+ @Test
+ public void testDoubleLivingIsolation() throws Exception {
+ final String treePipeName = "treePipe";
+ final String tablePipeName = "tablePipe";
+
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ // Create tree pipe
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe %s"
+ + " with source ("
+ + "'mode.double-living'='true')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+ treePipeName, receiverDataNode.getIpAndPortString()));
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Show tree pipe by tree session
+ Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv,
BaseEnv.TREE_SQL_DIALECT));
+
+ // Show table pipe by table session
+ Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv,
BaseEnv.TABLE_SQL_DIALECT));
+
+ // Create table pipe
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe %s"
+ + " with source ("
+ + "'mode.double-living'='true')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+ tablePipeName, receiverDataNode.getIpAndPortString()));
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Show tree pipe by tree session
+ Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv,
BaseEnv.TREE_SQL_DIALECT));
+
+ // Show table pipe by table session
+ Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv,
BaseEnv.TABLE_SQL_DIALECT));
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ // Drop pipe
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client.dropPipeExtended(new
TDropPipeReq(treePipeName).setIsTableModel(true)).getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .dropPipeExtended(new
TDropPipeReq(tablePipeName).setIsTableModel(false))
+ .getCode());
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeIsolationIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeIsolationIT.java
new file mode 100644
index 00000000000..57a3be38e21
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeIsolationIT.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.tablemodel;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2TableModel;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2TableModel.class})
+public class IoTDBPipeIsolationIT extends AbstractPipeTableModelTestIT {
+
+ @Test
+ public void testWritePipeIsolation() throws Exception {
+ final String treePipeName = "treePipe";
+ final String tablePipeName = "tablePipe";
+
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ // Create tree pipe
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe %s with sink ('node-urls'='%s')",
+ treePipeName, receiverDataNode.getIpAndPortString()));
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Create table pipe
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe %s with sink ('node-urls'='%s')",
+ tablePipeName, receiverDataNode.getIpAndPortString()));
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ // Start pipe
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
+ client
+ .startPipeExtended(new
TStartPipeReq(treePipeName).setIsTableModel(true))
+ .getCode());
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
+ client
+ .startPipeExtended(new
TStartPipeReq(tablePipeName).setIsTableModel(false))
+ .getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .startPipeExtended(new
TStartPipeReq(treePipeName).setIsTableModel(false))
+ .getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .startPipeExtended(new
TStartPipeReq(tablePipeName).setIsTableModel(true))
+ .getCode());
+
+ // Stop pipe
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
+ client.stopPipeExtended(new
TStopPipeReq(treePipeName).setIsTableModel(true)).getCode());
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
+ client
+ .stopPipeExtended(new
TStopPipeReq(tablePipeName).setIsTableModel(false))
+ .getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client.stopPipeExtended(new
TStopPipeReq(treePipeName).setIsTableModel(false)).getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client.stopPipeExtended(new
TStopPipeReq(tablePipeName).setIsTableModel(true)).getCode());
+
+ // Alter pipe
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
+ client
+ .alterPipe(
+ new TAlterPipeReq(
+ treePipeName,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ false,
+ false)
+ .setExtractorAttributes(Collections.emptyMap())
+ .setIsReplaceAllExtractorAttributes(false)
+ .setIsTableModel(true))
+ .getCode());
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
+ client
+ .alterPipe(
+ new TAlterPipeReq(
+ tablePipeName,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ false,
+ false)
+ .setExtractorAttributes(Collections.emptyMap())
+ .setIsReplaceAllExtractorAttributes(false)
+ .setIsTableModel(false))
+ .getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .alterPipe(
+ new TAlterPipeReq(
+ treePipeName,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ false,
+ false)
+ .setExtractorAttributes(Collections.emptyMap())
+ .setIsReplaceAllExtractorAttributes(false)
+ .setIsTableModel(false))
+ .getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .alterPipe(
+ new TAlterPipeReq(
+ tablePipeName,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ false,
+ false)
+ .setExtractorAttributes(Collections.emptyMap())
+ .setIsReplaceAllExtractorAttributes(false)
+ .setIsTableModel(true))
+ .getCode());
+
+ // Drop pipe
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
+ client.dropPipeExtended(new
TDropPipeReq(treePipeName).setIsTableModel(true)).getCode());
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
+ client
+ .dropPipeExtended(new
TDropPipeReq(tablePipeName).setIsTableModel(false))
+ .getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client.dropPipeExtended(new
TDropPipeReq(treePipeName).setIsTableModel(false)).getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client.dropPipeExtended(new
TDropPipeReq(tablePipeName).setIsTableModel(true)).getCode());
+ }
+ }
+
+ @Test
+ public void testReadPipeIsolation() {
+ final String treePipeName = "treePipe";
+ final String tablePipeName = "tablePipe";
+
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ // 1. Create tree pipe by tree session
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe %s with sink ('node-urls'='%s')",
+ treePipeName, receiverDataNode.getIpAndPortString()));
+ } catch (final SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // Show tree pipe by tree session
+ Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv,
BaseEnv.TREE_SQL_DIALECT));
+
+ // Show table pipe by table session
+ Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv,
BaseEnv.TABLE_SQL_DIALECT));
+
+ // 2. Create table pipe by table session
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe %s with sink ('node-urls'='%s')",
+ tablePipeName, receiverDataNode.getIpAndPortString()));
+ } catch (final SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // Show tree pipe by tree session
+ Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv,
BaseEnv.TREE_SQL_DIALECT));
+
+ // Show table pipe by table session
+ Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv,
BaseEnv.TABLE_SQL_DIALECT));
+ }
+
+ @Test
+ public void testCaptureTreeAndTableIsolation() throws Exception {
+ final String treePipeName = "tree_a2b";
+ final String tablePipeName = "table_a2b";
+
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ // 1. Create tree pipe by tree session
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe %s"
+ + " with source ("
+ + "'capture.tree'='true',"
+ + "'capture.table'='true')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+ treePipeName, receiverDataNode.getIpAndPortString()));
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Show tree pipe by tree session
+ Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv,
BaseEnv.TREE_SQL_DIALECT));
+
+ // Show table pipe by table session
+ Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv,
BaseEnv.TABLE_SQL_DIALECT));
+
+ // 2. Create table pipe by table session
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe %s"
+ + " with source ("
+ + "'capture.tree'='true',"
+ + "'capture.table'='true')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+ tablePipeName, receiverDataNode.getIpAndPortString()));
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Show tree pipe by tree session
+ Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv,
BaseEnv.TREE_SQL_DIALECT));
+
+ // Show table pipe by table session
+ Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv,
BaseEnv.TABLE_SQL_DIALECT));
+
+ // 3. Drop pipe
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client.dropPipeExtended(new
TDropPipeReq(treePipeName).setIsTableModel(true)).getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .dropPipeExtended(new
TDropPipeReq(tablePipeName).setIsTableModel(false))
+ .getCode());
+ }
+ }
+
+ @Test
+ public void testCaptureCornerCases() {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ // 1. Create tree pipe but capture table data
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe %s"
+ + " with source ("
+ + "'capture.tree'='false',"
+ + "'capture.table'='true')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+ "p1", receiverDataNode.getIpAndPortString()));
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Show tree pipe by tree session
+ Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv,
BaseEnv.TREE_SQL_DIALECT));
+
+ // Show table pipe by table session
+ Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv,
BaseEnv.TABLE_SQL_DIALECT));
+
+ // 2. Create table pipe but capture tree data
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe %s"
+ + " with source ("
+ + "'capture.tree'='true',"
+ + "'capture.table'='false')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+ "p2", receiverDataNode.getIpAndPortString()));
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Show tree pipe by tree session
+ Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv,
BaseEnv.TREE_SQL_DIALECT));
+
+ // Show table pipe by table session
+ Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv,
BaseEnv.TABLE_SQL_DIALECT));
+
+ // 3. Create pipe with capture.tree and capture.table set to false
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe %s"
+ + " with source ("
+ + "'capture.tree'='false',"
+ + "'capture.table'='false')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+ "p3", receiverDataNode.getIpAndPortString()));
+ fail();
+ } catch (final SQLException ignored) {
+ }
+
+ // Show tree pipe by tree session
+ Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv,
BaseEnv.TREE_SQL_DIALECT));
+
+ // Show table pipe by table session
+ Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv,
BaseEnv.TABLE_SQL_DIALECT));
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSwitchStatusIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSwitchStatusIT.java
index 5c5cbcd7b95..4c80f0ae673 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSwitchStatusIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSwitchStatusIT.java
@@ -303,11 +303,14 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeTableModelTestIT {
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
client.startPipe("").getCode());
Assert.assertEquals(
- TSStatusCode.PIPE_ERROR.getStatusCode(),
client.startPipe("p0").getCode());
- Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
client.startPipe("p").getCode());
- Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
client.startPipe("*").getCode());
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.startPipe("").getCode());
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.startPipe("p0").getCode());
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.startPipe("p").getCode());
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.startPipe("*").getCode());
List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
@@ -318,10 +321,14 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeTableModelTestIT {
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
- Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
client.stopPipe("").getCode());
- Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
client.stopPipe("p0").getCode());
- Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
client.stopPipe("p").getCode());
- Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
client.stopPipe("*").getCode());
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.stopPipe("").getCode());
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.stopPipe("p0").getCode());
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.stopPipe("p").getCode());
+ Assert.assertEquals(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.stopPipe("*").getCode());
showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSyntaxIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSyntaxIT.java
index 24b1c92563b..09658a583b0 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSyntaxIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSyntaxIT.java
@@ -89,7 +89,8 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelTestIT {
}
}
- List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
for (final String pipeName : expectedPipeNames) {
Assert.assertTrue(
showPipeResult.stream()
@@ -106,7 +107,7 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelTestIT {
}
}
- showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ showPipeResult = client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
Assert.assertEquals(0, showPipeResult.size());
}
}
@@ -124,7 +125,7 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelTestIT {
String.format(
"create pipe p1"
+ " with source ( "
- + "'capture.table'='test',"
+ + "'capture.table'='true',"
+ "'database-name'='test',"
+ "'table-name'='test',"
+ "'mode.streaming'='true',"
@@ -177,7 +178,8 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelTestIT {
} catch (SQLException ignored) {
}
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
Assert.assertEquals(0, showPipeResult.size());
}
}
@@ -249,7 +251,8 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelTestIT {
fail(e.getMessage());
}
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
Assert.assertEquals(2, showPipeResult.size());
}
}
@@ -315,7 +318,8 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelTestIT {
} catch (SQLException ignored) {
}
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
Assert.assertEquals(1, showPipeResult.size());
}
}
@@ -522,7 +526,8 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelTestIT {
} catch (Exception ignored) {
}
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
Assert.assertEquals(2, showPipeResult.size());
}
}
@@ -540,6 +545,7 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelTestIT {
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();
+ extractorAttributes.put("__system.sql-dialect", "table");
extractorAttributes.put("extractor.database-name", "test");
extractorAttributes.put("extractor.table-name", "test.*");
extractorAttributes.put("extractor.inclusion", "data.insert");
@@ -578,10 +584,12 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelTestIT {
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
Assert.assertEquals(3, showPipeResult.size());
- showPipeResult = client.showPipe(new
TShowPipeReq().setPipeName("p1")).pipeInfoList;
+ showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true).setPipeName("p1")).pipeInfoList;
Assert.assertTrue(showPipeResult.stream().anyMatch((o) ->
o.id.equals("p1")));
Assert.assertFalse(showPipeResult.stream().anyMatch((o) ->
o.id.equals("p2")));
Assert.assertFalse(showPipeResult.stream().anyMatch((o) ->
o.id.equals("p3")));
@@ -589,7 +597,9 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelTestIT {
// Show all pipes whose connector is also used by p1.
// p1 and p2 share the same connector parameters, so they have the same
connector.
showPipeResult =
- client.showPipe(new
TShowPipeReq().setPipeName("p1").setWhereClause(true)).pipeInfoList;
+ client.showPipe(
+ new
TShowPipeReq().setIsTableModel(true).setPipeName("p1").setWhereClause(true))
+ .pipeInfoList;
Assert.assertTrue(showPipeResult.stream().anyMatch((o) ->
o.id.equals("p1")));
Assert.assertTrue(showPipeResult.stream().anyMatch((o) ->
o.id.equals("p2")));
Assert.assertFalse(showPipeResult.stream().anyMatch((o) ->
o.id.equals("p3")));
@@ -678,7 +688,8 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeTableModelTestIT {
fail(e.getMessage());
}
- final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ final List<TShowPipeInfo> showPipeResult =
+ client.showPipe(new
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
Assert.assertEquals(1, showPipeResult.size());
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java
index c7da1382535..a3c8c375742 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java
@@ -37,6 +37,8 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
@@ -772,4 +774,19 @@ public class TableModelUtils {
return tablet;
}
+
+ public static int showPipesCount(final BaseEnv baseEnv, final String
sqlDialect) {
+ try (final Connection connection = baseEnv.getConnection(sqlDialect);
+ final Statement statement = connection.createStatement()) {
+ final ResultSet resultSet = statement.executeQuery("show pipes");
+ int count = 0;
+ while (resultSet.next()) {
+ count++;
+ }
+ return count;
+ } catch (final SQLException e) {
+ fail(e.getMessage());
+ }
+ return 0;
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index 020690f2465..20cd76e4a41 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -94,6 +94,13 @@ public class PipeTableResp implements DataSet {
}
}
+ public PipeTableResp filter(
+ final Boolean whereClause, final String pipeName, final boolean
isTableModel) {
+ final PipeTableResp resp = filter(whereClause, pipeName);
+ resp.allPipeMeta.removeIf(meta -> !meta.visibleUnder(isTableModel));
+ return resp;
+ }
+
public TGetAllPipeInfoResp convertToTGetAllPipeInfoResp() throws IOException
{
final List<ByteBuffer> pipeInformationByteBuffers = new ArrayList<>();
for (final PipeMeta pipeMeta : allPipeMeta) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 130f1dcd15c..6f6d625768c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -226,6 +226,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowTopicResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
@@ -2193,18 +2195,18 @@ public class ConfigManager implements IManager {
}
@Override
- public TSStatus startPipe(String pipeName) {
+ public TSStatus startPipe(TStartPipeReq req) {
TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- ? pipeManager.getPipeTaskCoordinator().startPipe(pipeName)
+ ? pipeManager.getPipeTaskCoordinator().startPipe(req)
: status;
}
@Override
- public TSStatus stopPipe(String pipeName) {
+ public TSStatus stopPipe(TStopPipeReq req) {
TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- ? pipeManager.getPipeTaskCoordinator().stopPipe(pipeName)
+ ? pipeManager.getPipeTaskCoordinator().stopPipe(req)
: status;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 63e527440ed..82dae052ef2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -147,6 +147,8 @@ import
org.apache.iotdb.confignode.rpc.thrift.TShowTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowTopicResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
@@ -694,34 +696,39 @@ public interface IManager {
* Alter Pipe.
*
* @param req Info about Pipe
- * @return TSStatus
+ * @return {@link TSStatusCode#SUCCESS_STATUS} if altered the pipe
successfully, {@link
+ * TSStatusCode#PIPE_ERROR} if encountered failure, {@link
TSStatusCode#PIPE_NOT_EXIST_ERROR}
+ * if the pipe does not exist.
*/
TSStatus alterPipe(TAlterPipeReq req);
/**
* Start Pipe.
*
- * @param pipeName name of Pipe
+ * @param req Info about Pipe
* @return {@link TSStatusCode#SUCCESS_STATUS} if started the pipe
successfully, {@link
- * TSStatusCode#PIPE_ERROR} if encountered failure.
+ * TSStatusCode#PIPE_ERROR} if encountered failure, {@link
TSStatusCode#PIPE_NOT_EXIST_ERROR}
+ * if the pipe does not exist.
*/
- TSStatus startPipe(String pipeName);
+ TSStatus startPipe(TStartPipeReq req);
/**
* Stop Pipe.
*
- * @param pipeName name of Pipe
+ * @param req Info about Pipe
* @return {@link TSStatusCode#SUCCESS_STATUS} if stopped the pipe
successfully, {@link
- * TSStatusCode#PIPE_ERROR} if encountered failure.
+ * TSStatusCode#PIPE_ERROR} if encountered failure, {@link
TSStatusCode#PIPE_NOT_EXIST_ERROR}
+ * if the pipe does not exist.
*/
- TSStatus stopPipe(String pipeName);
+ TSStatus stopPipe(TStopPipeReq req);
/**
* Drop Pipe.
*
* @param req Info about Pipe
* @return {@link TSStatusCode#SUCCESS_STATUS} if dropped the pipe
successfully, {@link
- * TSStatusCode#PIPE_ERROR} if encountered failure.
+ * TSStatusCode#PIPE_ERROR} if encountered failure, {@link
TSStatusCode#PIPE_NOT_EXIST_ERROR}
+ * if the pipe does not exist.
*/
TSStatus dropPipe(TDropPipeReq req);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index 7de39849e59..4aaf3ab46c3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -116,7 +118,7 @@ public class PipeTaskCoordinator {
/** Caller should ensure that the method is called in the lock {@link
#lock()}. */
public TSStatus createPipe(TCreatePipeReq req) {
- TSStatus status = null;
+ final TSStatus status;
if (req.getPipeName().startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
status = configManager.getProcedureManager().createConsensusPipe(req);
} else {
@@ -130,6 +132,17 @@ public class PipeTaskCoordinator {
/** Caller should ensure that the method is called in the lock {@link
#lock()}. */
public TSStatus alterPipe(TAlterPipeReq req) {
+ final String pipeName = req.getPipeName();
+ final boolean isSetIfExistsCondition =
+ req.isSetIfExistsCondition() && req.isIfExistsCondition();
+ if (!pipeTaskInfo.isPipeExisted(pipeName, req.isTableModel)) {
+ return isSetIfExistsCondition
+ ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
+ : RpcUtils.getStatus(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR,
+ String.format(
+ "Failed to alter pipe %s. Failures: %s does not exist.",
pipeName, pipeName));
+ }
final TSStatus status = configManager.getProcedureManager().alterPipe(req);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn("Failed to alter pipe {}. Result status: {}.",
req.getPipeName(), status);
@@ -138,8 +151,8 @@ public class PipeTaskCoordinator {
}
/** Caller should ensure that the method is called in the lock {@link
#lock()}. */
- public TSStatus startPipe(String pipeName) {
- TSStatus status = null;
+ private TSStatus startPipe(String pipeName) {
+ final TSStatus status;
if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
status =
configManager.getProcedureManager().startConsensusPipe(pipeName);
} else {
@@ -152,9 +165,21 @@ public class PipeTaskCoordinator {
}
/** Caller should ensure that the method is called in the lock {@link
#lock()}. */
- public TSStatus stopPipe(String pipeName) {
+ public TSStatus startPipe(TStartPipeReq req) {
+ final String pipeName = req.getPipeName();
+ if (!pipeTaskInfo.isPipeExisted(pipeName, req.isTableModel)) {
+ return RpcUtils.getStatus(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR,
+ String.format(
+ "Failed to start pipe %s. Failures: %s does not exist.",
pipeName, pipeName));
+ }
+ return startPipe(pipeName);
+ }
+
+ /** Caller should ensure that the method is called in the lock {@link
#lock()}. */
+ private TSStatus stopPipe(String pipeName) {
final boolean isStoppedByRuntimeException =
pipeTaskInfo.isStoppedByRuntimeException(pipeName);
- TSStatus status = null;
+ final TSStatus status;
if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
status = configManager.getProcedureManager().stopConsensusPipe(pipeName);
} else {
@@ -176,11 +201,32 @@ public class PipeTaskCoordinator {
return status;
}
+ /** Caller should ensure that the method is called in the lock {@link
#lock()}. */
+ public TSStatus stopPipe(TStopPipeReq req) {
+ final String pipeName = req.getPipeName();
+ if (!pipeTaskInfo.isPipeExisted(pipeName, req.isTableModel)) {
+ return RpcUtils.getStatus(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR,
+ String.format(
+ "Failed to stop pipe %s. Failures: %s does not exist.",
pipeName, pipeName));
+ }
+ return stopPipe(pipeName);
+ }
+
/** Caller should ensure that the method is called in the lock {@link
#lock()}. */
public TSStatus dropPipe(TDropPipeReq req) {
final String pipeName = req.getPipeName();
- final boolean isPipeExistedBeforeDrop =
pipeTaskInfo.isPipeExisted(pipeName);
- TSStatus status = null;
+ final boolean isSetIfExistsCondition =
+ req.isSetIfExistsCondition() && req.isIfExistsCondition();
+ if (!pipeTaskInfo.isPipeExisted(pipeName, req.isTableModel)) {
+ return isSetIfExistsCondition
+ ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
+ : RpcUtils.getStatus(
+ TSStatusCode.PIPE_NOT_EXIST_ERROR,
+ String.format(
+ "Failed to drop pipe %s. Failures: %s does not exist.",
pipeName, pipeName));
+ }
+ final TSStatus status;
if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
status = configManager.getProcedureManager().dropConsensusPipe(pipeName);
} else {
@@ -189,23 +235,13 @@ public class PipeTaskCoordinator {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn("Failed to drop pipe {}. Result status: {}.", pipeName,
status);
}
-
- final boolean isSetIfExistsCondition =
- req.isSetIfExistsCondition() && req.isIfExistsCondition();
- // If the `IF EXISTS` condition is not set and the pipe does not exist
before the delete
- // operation, return an error status indicating that the pipe does not
exist.
- return isPipeExistedBeforeDrop || isSetIfExistsCondition
- ? status
- : RpcUtils.getStatus(
- TSStatusCode.PIPE_NOT_EXIST_ERROR,
- String.format(
- "Failed to drop pipe %s. Failures: %s does not exist.",
pipeName, pipeName));
+ return status;
}
public TShowPipeResp showPipes(final TShowPipeReq req) {
try {
return ((PipeTableResp) configManager.getConsensusManager().read(new
ShowPipePlanV2()))
- .filter(req.whereClause, req.pipeName)
+ .filter(req.whereClause, req.pipeName, req.isTableModel)
.convertToTShowPipeResp();
} catch (final ConsensusException e) {
LOGGER.warn("Failed in the read API executing the consensus layer due
to: ", e);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 372209bfd74..8fe6659dc0c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -187,23 +187,19 @@ public class PipeTaskInfo implements SnapshotProcessor {
throw new PipeException(exceptionMessage);
}
- public boolean checkAndUpdateRequestBeforeAlterPipe(final TAlterPipeReq
alterPipeRequest)
+ public void checkAndUpdateRequestBeforeAlterPipe(final TAlterPipeReq
alterPipeRequest)
throws PipeException {
acquireReadLock();
try {
- return checkAndUpdateRequestBeforeAlterPipeInternal(alterPipeRequest);
+ checkAndUpdateRequestBeforeAlterPipeInternal(alterPipeRequest);
} finally {
releaseReadLock();
}
}
- private boolean checkAndUpdateRequestBeforeAlterPipeInternal(final
TAlterPipeReq alterPipeRequest)
+ private void checkAndUpdateRequestBeforeAlterPipeInternal(final
TAlterPipeReq alterPipeRequest)
throws PipeException {
if (!isPipeExisted(alterPipeRequest.getPipeName())) {
- if (alterPipeRequest.isSetIfExistsCondition() &&
alterPipeRequest.isIfExistsCondition()) {
- return false;
- }
-
final String exceptionMessage =
String.format(
"Failed to alter pipe %s, %s", alterPipeRequest.getPipeName(),
PIPE_NOT_EXIST_MSG);
@@ -267,8 +263,6 @@ public class PipeTaskInfo implements SnapshotProcessor {
.getAttribute());
}
}
-
- return true;
}
public void checkBeforeStartPipe(final String pipeName) throws PipeException
{
@@ -351,6 +345,15 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
+ public boolean isPipeExisted(final String pipeName, final boolean
isTableModel) {
+ acquireReadLock();
+ try {
+ return pipeMetaKeeper.containsPipeMeta(pipeName, isTableModel);
+ } finally {
+ releaseReadLock();
+ }
+ }
+
private PipeStatus getPipeStatus(final String pipeName) {
acquireReadLock();
try {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 8b6aceb9c30..ba7d40abc53 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -96,9 +96,7 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
// We should execute checkBeforeAlterPipe before checking the pipe plugin.
This method will
// update the alterPipeRequest based on the alterPipeRequest and existing
pipe metadata.
- if
(!pipeTaskInfo.get().checkAndUpdateRequestBeforeAlterPipe(alterPipeRequest)) {
- return false;
- }
+ pipeTaskInfo.get().checkAndUpdateRequestBeforeAlterPipe(alterPipeRequest);
final PipeManager pipeManager = env.getConfigManager().getPipeManager();
pipeManager
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 2961a5f7c42..219b68813e6 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -200,6 +200,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowTopicResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
import org.apache.iotdb.confignode.rpc.thrift.TTestOperation;
@@ -1113,12 +1115,22 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
@Override
public TSStatus startPipe(String pipeName) {
- return configManager.startPipe(pipeName);
+ return configManager.startPipe(new TStartPipeReq().setPipeName(pipeName));
+ }
+
+ @Override
+ public TSStatus startPipeExtended(TStartPipeReq req) {
+ return configManager.startPipe(req);
}
@Override
public TSStatus stopPipe(String pipeName) {
- return configManager.stopPipe(pipeName);
+ return configManager.stopPipe(new TStopPipeReq().setPipeName(pipeName));
+ }
+
+ @Override
+ public TSStatus stopPipeExtended(TStopPipeReq req) {
+ return configManager.stopPipe(req);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index bf9a67061b4..0a033aff297 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
@@ -134,7 +135,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
.getStringOrDefault(
SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TREE_VALUE)
.equals(SystemConstant.SQL_DIALECT_TREE_VALUE);
- final boolean isTreeModelDataAllowedToBeCaptured =
+ final boolean isCaptureTree =
validator
.getParameters()
.getBooleanOrDefault(
@@ -142,7 +143,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY,
PipeExtractorConstant.SOURCE_CAPTURE_TREE_KEY),
isTreeDialect);
- final boolean isTableModelDataAllowedToBeCaptured =
+ final boolean isCaptureTable =
validator
.getParameters()
.getBooleanOrDefault(
@@ -150,6 +151,21 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
PipeExtractorConstant.EXTRACTOR_CAPTURE_TABLE_KEY,
PipeExtractorConstant.SOURCE_CAPTURE_TABLE_KEY),
!isTreeDialect);
+ if (!isCaptureTree && !isCaptureTable) {
+ throw new PipeParameterNotValidException(
+ "capture.tree and capture.table can not both be specified as false");
+ }
+
+ final boolean isDoubleLiving =
+ validator
+ .getParameters()
+ .getBooleanOrDefault(
+ Arrays.asList(
+ PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
+ PipeExtractorConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
+
PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
+ final boolean isTreeModelDataAllowedToBeCaptured = isDoubleLiving ||
isCaptureTree;
+ final boolean isTableModelDataAllowedToBeCaptured = isDoubleLiving ||
isCaptureTable;
if (!isTreeModelDataAllowedToBeCaptured
&& validator
.getParameters()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 5fd09f22a9a..f50fbfd63bd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -163,6 +163,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowTopicResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
import org.apache.iotdb.confignode.rpc.thrift.TTestOperation;
@@ -1054,12 +1056,24 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
() -> client.startPipe(pipeName), status ->
!updateConfigNodeLeader(status));
}
+ @Override
+ public TSStatus startPipeExtended(TStartPipeReq req) throws TException {
+ return executeRemoteCallWithRetry(
+ () -> client.startPipeExtended(req), status ->
!updateConfigNodeLeader(status));
+ }
+
@Override
public TSStatus stopPipe(String pipeName) throws TException {
return executeRemoteCallWithRetry(
() -> client.stopPipe(pipeName), status ->
!updateConfigNodeLeader(status));
}
+ @Override
+ public TSStatus stopPipeExtended(TStopPipeReq req) throws TException {
+ return executeRemoteCallWithRetry(
+ () -> client.stopPipeExtended(req), status ->
!updateConfigNodeLeader(status));
+ }
+
@Override
public TSStatus dropPipe(String pipeName) throws TException {
return executeRemoteCallWithRetry(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 4a14c71a3d3..3c41ea2ac2e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -133,6 +133,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowTopicResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -2044,6 +2046,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
req.setExtractorAttributes(alterPipeStatement.getExtractorAttributes());
req.setIsReplaceAllExtractorAttributes(alterPipeStatement.isReplaceAllExtractorAttributes());
req.setIfExistsCondition(alterPipeStatement.hasIfExistsCondition());
+ req.setIsTableModel(alterPipeStatement.isTableModel());
final TSStatus tsStatus = configNodeClient.alterPipe(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
future.setException(new IoTDBException(tsStatus.message,
tsStatus.code));
@@ -2073,7 +2076,11 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- final TSStatus tsStatus =
configNodeClient.startPipe(startPipeStatement.getPipeName());
+ final TSStatus tsStatus =
+ configNodeClient.startPipeExtended(
+ new TStartPipeReq()
+ .setPipeName(startPipeStatement.getPipeName())
+ .setIsTableModel(startPipeStatement.isTableModel()));
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
future.setException(new IoTDBException(tsStatus.message,
tsStatus.code));
} else {
@@ -2106,7 +2113,8 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
configNodeClient.dropPipeExtended(
new TDropPipeReq()
.setPipeName(dropPipeStatement.getPipeName())
-
.setIfExistsCondition(dropPipeStatement.hasIfExistsCondition()));
+
.setIfExistsCondition(dropPipeStatement.hasIfExistsCondition())
+ .setIsTableModel(dropPipeStatement.isTableModel()));
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
future.setException(new IoTDBException(tsStatus.message,
tsStatus.code));
} else {
@@ -2135,7 +2143,12 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- final TSStatus tsStatus =
configNodeClient.stopPipe(stopPipeStatement.getPipeName());
+
+ final TSStatus tsStatus =
+ configNodeClient.stopPipeExtended(
+ new TStopPipeReq()
+ .setPipeName(stopPipeStatement.getPipeName())
+ .setIsTableModel(stopPipeStatement.isTableModel()));
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
future.setException(new IoTDBException(tsStatus.message,
tsStatus.code));
} else {
@@ -2159,6 +2172,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
if (showPipesStatement.getWhereClause()) {
tShowPipeReq.setWhereClause(true);
}
+ tShowPipeReq.setIsTableModel(showPipesStatement.isTableModel());
final List<TShowPipeInfo> tShowPipeInfoList =
configNodeClient.showPipe(tShowPipeReq).getPipeInfoList();
ShowPipeTask.buildTSBlock(tShowPipeInfoList, future);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
index f6d0ebab2cb..159c6941628 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
@@ -37,13 +37,13 @@ public class AlterPipeTask implements IConfigTask {
private final AlterPipeStatement alterPipeStatement;
- public AlterPipeTask(AlterPipeStatement alterPipeStatement) {
+ public AlterPipeTask(final AlterPipeStatement alterPipeStatement) {
// support now() function
applyNowFunctionToExtractorAttributes(alterPipeStatement.getExtractorAttributes());
this.alterPipeStatement = alterPipeStatement;
}
- public AlterPipeTask(AlterPipe node) {
+ public AlterPipeTask(final AlterPipe node) {
alterPipeStatement = new AlterPipeStatement(StatementType.ALTER_PIPE);
alterPipeStatement.setPipeName(node.getPipeName());
alterPipeStatement.setIfExists(node.hasIfExistsCondition());
@@ -57,10 +57,12 @@ public class AlterPipeTask implements IConfigTask {
alterPipeStatement.setReplaceAllExtractorAttributes(node.isReplaceAllExtractorAttributes());
alterPipeStatement.setReplaceAllProcessorAttributes(node.isReplaceAllProcessorAttributes());
alterPipeStatement.setReplaceAllConnectorAttributes(node.isReplaceAllConnectorAttributes());
+
+ alterPipeStatement.setTableModel(true);
}
@Override
- public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
+ public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
return configTaskExecutor.alterPipe(alterPipeStatement);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/DropPipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/DropPipeTask.java
index a28ca91f3c6..a10c6042241 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/DropPipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/DropPipeTask.java
@@ -32,18 +32,19 @@ public class DropPipeTask implements IConfigTask {
private final DropPipeStatement dropPipeStatement;
- public DropPipeTask(DropPipeStatement dropPipeStatement) {
+ public DropPipeTask(final DropPipeStatement dropPipeStatement) {
this.dropPipeStatement = dropPipeStatement;
}
- public DropPipeTask(DropPipe node) {
+ public DropPipeTask(final DropPipe node) {
dropPipeStatement = new DropPipeStatement(StatementType.DROP_PIPE);
dropPipeStatement.setPipeName(node.getPipeName());
dropPipeStatement.setIfExists(node.hasIfExistsCondition());
+ dropPipeStatement.setTableModel(true);
}
@Override
- public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
+ public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
return configTaskExecutor.dropPipe(dropPipeStatement);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
index 6481d0713ac..c43a9953b58 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
@@ -52,10 +52,11 @@ public class ShowPipeTask implements IConfigTask {
this.showPipesStatement = showPipesStatement;
}
- public ShowPipeTask(ShowPipes node) {
+ public ShowPipeTask(final ShowPipes node) {
showPipesStatement = new ShowPipesStatement();
showPipesStatement.setPipeName(node.getPipeName());
showPipesStatement.setWhereClause(node.hasWhereClause());
+ showPipesStatement.setTableModel(true);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StartPipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StartPipeTask.java
index 39ff162a43c..c118f0a3a58 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StartPipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StartPipeTask.java
@@ -32,17 +32,18 @@ public class StartPipeTask implements IConfigTask {
private final StartPipeStatement startPipeStatement;
- public StartPipeTask(StartPipeStatement startPipeStatement) {
+ public StartPipeTask(final StartPipeStatement startPipeStatement) {
this.startPipeStatement = startPipeStatement;
}
- public StartPipeTask(StartPipe node) {
+ public StartPipeTask(final StartPipe node) {
startPipeStatement = new StartPipeStatement(StatementType.START_PIPE);
startPipeStatement.setPipeName(node.getPipeName());
+ startPipeStatement.setTableModel(true);
}
@Override
- public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
+ public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
return configTaskExecutor.startPipe(startPipeStatement);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StopPipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StopPipeTask.java
index 04806bef42f..91764c43263 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StopPipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StopPipeTask.java
@@ -32,17 +32,18 @@ public class StopPipeTask implements IConfigTask {
private final StopPipeStatement stopPipeStatement;
- public StopPipeTask(StopPipeStatement stopPipeStatement) {
+ public StopPipeTask(final StopPipeStatement stopPipeStatement) {
this.stopPipeStatement = stopPipeStatement;
}
- public StopPipeTask(StopPipe node) {
+ public StopPipeTask(final StopPipe node) {
stopPipeStatement = new StopPipeStatement(StatementType.STOP_PIPE);
stopPipeStatement.setPipeName(node.getPipeName());
+ stopPipeStatement.setTableModel(true);
}
@Override
- public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
+ public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
return configTaskExecutor.stopPipe(stopPipeStatement);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AlterPipe.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AlterPipe.java
index 4d398b55fcc..a40fab917a5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AlterPipe.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AlterPipe.java
@@ -88,7 +88,7 @@ public class AlterPipe extends PipeStatement {
}
@Override
- public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+ public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
return visitor.visitAlterPipe(this, context);
}
@@ -106,25 +106,25 @@ public class AlterPipe extends PipeStatement {
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- AlterPipe alterPipe = (AlterPipe) obj;
- return Objects.equals(pipeName, alterPipe.pipeName)
- && Objects.equals(ifExistsCondition, alterPipe.ifExistsCondition)
- && Objects.equals(extractorAttributes, alterPipe.extractorAttributes)
- && Objects.equals(processorAttributes, alterPipe.processorAttributes)
- && Objects.equals(connectorAttributes, alterPipe.connectorAttributes)
+ final AlterPipe that = (AlterPipe) obj;
+ return Objects.equals(this.pipeName, that.pipeName)
+ && Objects.equals(this.ifExistsCondition, that.ifExistsCondition)
+ && Objects.equals(this.extractorAttributes, that.extractorAttributes)
+ && Objects.equals(this.processorAttributes, that.processorAttributes)
+ && Objects.equals(this.connectorAttributes, that.connectorAttributes)
&& Objects.equals(
- isReplaceAllExtractorAttributes,
alterPipe.isReplaceAllExtractorAttributes)
+ this.isReplaceAllExtractorAttributes,
that.isReplaceAllExtractorAttributes)
&& Objects.equals(
- isReplaceAllProcessorAttributes,
alterPipe.isReplaceAllProcessorAttributes)
+ this.isReplaceAllProcessorAttributes,
that.isReplaceAllProcessorAttributes)
&& Objects.equals(
- isReplaceAllConnectorAttributes,
alterPipe.isReplaceAllConnectorAttributes);
+ this.isReplaceAllConnectorAttributes,
that.isReplaceAllConnectorAttributes);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java
index 6952c06d13d..1b3837ba214 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java
@@ -43,7 +43,7 @@ public class DropPipe extends PipeStatement {
}
@Override
- public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+ public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
return visitor.visitDropPipe(this, context);
}
@@ -53,16 +53,16 @@ public class DropPipe extends PipeStatement {
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- DropPipe other = (DropPipe) obj;
- return Objects.equals(pipeName, other.pipeName)
- && Objects.equals(ifExistsCondition, other.ifExistsCondition);
+ final DropPipe that = (DropPipe) obj;
+ return Objects.equals(this.pipeName, that.pipeName)
+ && Objects.equals(this.ifExistsCondition, that.ifExistsCondition);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowPipes.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowPipes.java
index e6507961093..c0dfe9379de 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowPipes.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowPipes.java
@@ -44,7 +44,7 @@ public class ShowPipes extends PipeStatement {
}
@Override
- public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+ public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
return visitor.visitShowPipes(this, context);
}
@@ -54,16 +54,16 @@ public class ShowPipes extends PipeStatement {
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- ShowPipes other = (ShowPipes) obj;
- return Objects.equals(pipeName, other.pipeName)
- && Objects.equals(hasWhereClause, other.hasWhereClause);
+ final ShowPipes that = (ShowPipes) obj;
+ return Objects.equals(this.pipeName, that.pipeName)
+ && Objects.equals(this.hasWhereClause, that.hasWhereClause);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StartPipe.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StartPipe.java
index 540d6fef9ec..15eb5b2413d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StartPipe.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StartPipe.java
@@ -37,7 +37,7 @@ public class StartPipe extends PipeStatement {
}
@Override
- public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+ public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
return visitor.visitStartPipe(this, context);
}
@@ -47,15 +47,15 @@ public class StartPipe extends PipeStatement {
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- StartPipe other = (StartPipe) obj;
- return Objects.equals(pipeName, other.pipeName);
+ final StartPipe that = (StartPipe) obj;
+ return Objects.equals(this.pipeName, that.pipeName);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java
index daa9ab32e02..6d1a629de88 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java
@@ -37,7 +37,7 @@ public class StopPipe extends PipeStatement {
}
@Override
- public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+ public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
return visitor.visitStopPipe(this, context);
}
@@ -47,15 +47,15 @@ public class StopPipe extends PipeStatement {
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- StopPipe other = (StopPipe) obj;
- return Objects.equals(pipeName, other.pipeName);
+ final StopPipe that = (StopPipe) obj;
+ return Objects.equals(this.pipeName, that.pipeName);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
index de5dc3c1d59..44065cf3185 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
@@ -44,8 +44,9 @@ public class AlterPipeStatement extends Statement implements
IConfigStatement {
private boolean isReplaceAllExtractorAttributes;
private boolean isReplaceAllProcessorAttributes;
private boolean isReplaceAllConnectorAttributes;
+ private boolean isTableModel;
- public AlterPipeStatement(StatementType alterPipeStatement) {
+ public AlterPipeStatement(final StatementType alterPipeStatement) {
this.statementType = alterPipeStatement;
}
@@ -81,38 +82,46 @@ public class AlterPipeStatement extends Statement
implements IConfigStatement {
return isReplaceAllConnectorAttributes;
}
- public void setPipeName(String pipeName) {
+ public boolean isTableModel() {
+ return isTableModel;
+ }
+
+ public void setPipeName(final String pipeName) {
this.pipeName = pipeName;
}
- public void setIfExists(boolean ifExistsCondition) {
+ public void setIfExists(final boolean ifExistsCondition) {
this.ifExistsCondition = ifExistsCondition;
}
- public void setExtractorAttributes(Map<String, String> extractorAttributes) {
+ public void setExtractorAttributes(final Map<String, String>
extractorAttributes) {
this.extractorAttributes = extractorAttributes;
}
- public void setProcessorAttributes(Map<String, String> processorAttributes) {
+ public void setProcessorAttributes(final Map<String, String>
processorAttributes) {
this.processorAttributes = processorAttributes;
}
- public void setConnectorAttributes(Map<String, String> connectorAttributes) {
+ public void setConnectorAttributes(final Map<String, String>
connectorAttributes) {
this.connectorAttributes = connectorAttributes;
}
- public void setReplaceAllExtractorAttributes(boolean
replaceAllExtractorAttributes) {
+ public void setReplaceAllExtractorAttributes(final boolean
replaceAllExtractorAttributes) {
isReplaceAllExtractorAttributes = replaceAllExtractorAttributes;
}
- public void setReplaceAllProcessorAttributes(boolean
replaceAllProcessorAttributes) {
+ public void setReplaceAllProcessorAttributes(final boolean
replaceAllProcessorAttributes) {
isReplaceAllProcessorAttributes = replaceAllProcessorAttributes;
}
- public void setReplaceAllConnectorAttributes(boolean
replaceAllConnectorAttributes) {
+ public void setReplaceAllConnectorAttributes(final boolean
replaceAllConnectorAttributes) {
isReplaceAllConnectorAttributes = replaceAllConnectorAttributes;
}
+ public void setTableModel(final boolean tableModel) {
+ this.isTableModel = tableModel;
+ }
+
@Override
public QueryType getQueryType() {
return QueryType.WRITE;
@@ -124,7 +133,7 @@ public class AlterPipeStatement extends Statement
implements IConfigStatement {
}
@Override
- public TSStatus checkPermissionBeforeProcess(String userName) {
+ public TSStatus checkPermissionBeforeProcess(final String userName) {
if (AuthorityChecker.SUPER_USER.equals(userName)) {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@@ -134,7 +143,7 @@ public class AlterPipeStatement extends Statement
implements IConfigStatement {
}
@Override
- public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ public <R, C> R accept(final StatementVisitor<R, C> visitor, final C
context) {
return visitor.visitAlterPipe(this, context);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
index a3403e00e68..c61a15d777d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
@@ -37,27 +37,36 @@ public class DropPipeStatement extends Statement implements
IConfigStatement {
private String pipeName;
private boolean ifExistsCondition;
+ private boolean isTableModel;
- public DropPipeStatement(StatementType dropPipeStatement) {
+ public DropPipeStatement(final StatementType dropPipeStatement) {
this.statementType = dropPipeStatement;
}
+ public String getPipeName() {
+ return pipeName;
+ }
+
public boolean hasIfExistsCondition() {
return ifExistsCondition;
}
- public String getPipeName() {
- return pipeName;
+ public boolean isTableModel() {
+ return isTableModel;
}
- public void setPipeName(String pipeName) {
+ public void setPipeName(final String pipeName) {
this.pipeName = pipeName;
}
- public void setIfExists(boolean ifExistsCondition) {
+ public void setIfExists(final boolean ifExistsCondition) {
this.ifExistsCondition = ifExistsCondition;
}
+ public void setTableModel(final boolean tableModel) {
+ this.isTableModel = tableModel;
+ }
+
@Override
public QueryType getQueryType() {
return QueryType.WRITE;
@@ -69,7 +78,7 @@ public class DropPipeStatement extends Statement implements
IConfigStatement {
}
@Override
- public TSStatus checkPermissionBeforeProcess(String userName) {
+ public TSStatus checkPermissionBeforeProcess(final String userName) {
if (AuthorityChecker.SUPER_USER.equals(userName)) {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@@ -79,7 +88,7 @@ public class DropPipeStatement extends Statement implements
IConfigStatement {
}
@Override
- public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ public <R, C> R accept(final StatementVisitor<R, C> visitor, final C
context) {
return visitor.visitDropPipe(this, context);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/ShowPipesStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/ShowPipesStatement.java
index 54fe1099f39..658f36a0635 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/ShowPipesStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/ShowPipesStatement.java
@@ -39,6 +39,8 @@ public class ShowPipesStatement extends ShowStatement
implements IConfigStatemen
private boolean whereClause;
+ private boolean isTableModel;
+
public String getPipeName() {
return pipeName;
}
@@ -47,21 +49,29 @@ public class ShowPipesStatement extends ShowStatement
implements IConfigStatemen
return whereClause;
}
- public void setPipeName(String pipeName) {
+ public boolean isTableModel() {
+ return isTableModel;
+ }
+
+ public void setPipeName(final String pipeName) {
this.pipeName = pipeName;
}
- public void setWhereClause(boolean whereClause) {
+ public void setWhereClause(final boolean whereClause) {
this.whereClause = whereClause;
}
+ public void setTableModel(final boolean tableModel) {
+ this.isTableModel = tableModel;
+ }
+
@Override
public QueryType getQueryType() {
return QueryType.READ;
}
@Override
- public TSStatus checkPermissionBeforeProcess(String userName) {
+ public TSStatus checkPermissionBeforeProcess(final String userName) {
if (AuthorityChecker.SUPER_USER.equals(userName)) {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@@ -71,7 +81,7 @@ public class ShowPipesStatement extends ShowStatement
implements IConfigStatemen
}
@Override
- public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ public <R, C> R accept(final StatementVisitor<R, C> visitor, final C
context) {
return visitor.visitShowPipes(this, context);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StartPipeStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StartPipeStatement.java
index 43b61a1fb44..9023d77c06e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StartPipeStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StartPipeStatement.java
@@ -36,8 +36,9 @@ import java.util.List;
public class StartPipeStatement extends Statement implements IConfigStatement {
private String pipeName;
+ private boolean isTableModel;
- public StartPipeStatement(StatementType startPipeStatement) {
+ public StartPipeStatement(final StatementType startPipeStatement) {
this.statementType = startPipeStatement;
}
@@ -45,10 +46,18 @@ public class StartPipeStatement extends Statement
implements IConfigStatement {
return pipeName;
}
- public void setPipeName(String pipeName) {
+ public boolean isTableModel() {
+ return isTableModel;
+ }
+
+ public void setPipeName(final String pipeName) {
this.pipeName = pipeName;
}
+ public void setTableModel(final boolean tableModel) {
+ this.isTableModel = tableModel;
+ }
+
@Override
public QueryType getQueryType() {
return QueryType.WRITE;
@@ -60,7 +69,7 @@ public class StartPipeStatement extends Statement implements
IConfigStatement {
}
@Override
- public TSStatus checkPermissionBeforeProcess(String userName) {
+ public TSStatus checkPermissionBeforeProcess(final String userName) {
if (AuthorityChecker.SUPER_USER.equals(userName)) {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@@ -70,7 +79,7 @@ public class StartPipeStatement extends Statement implements
IConfigStatement {
}
@Override
- public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ public <R, C> R accept(final StatementVisitor<R, C> visitor, final C
context) {
return visitor.visitStartPipe(this, context);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StopPipeStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StopPipeStatement.java
index d463fda199d..ea14a3dbd1b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StopPipeStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StopPipeStatement.java
@@ -36,8 +36,9 @@ import java.util.List;
public class StopPipeStatement extends Statement implements IConfigStatement {
private String pipeName;
+ private boolean isTableModel;
- public StopPipeStatement(StatementType stopPipeStatement) {
+ public StopPipeStatement(final StatementType stopPipeStatement) {
this.statementType = stopPipeStatement;
}
@@ -45,10 +46,18 @@ public class StopPipeStatement extends Statement implements
IConfigStatement {
return pipeName;
}
- public void setPipeName(String pipeName) {
+ public boolean isTableModel() {
+ return isTableModel;
+ }
+
+ public void setPipeName(final String pipeName) {
this.pipeName = pipeName;
}
+ public void setTableModel(final boolean tableModel) {
+ this.isTableModel = tableModel;
+ }
+
@Override
public QueryType getQueryType() {
return QueryType.WRITE;
@@ -60,7 +69,7 @@ public class StopPipeStatement extends Statement implements
IConfigStatement {
}
@Override
- public TSStatus checkPermissionBeforeProcess(String userName) {
+ public TSStatus checkPermissionBeforeProcess(final String userName) {
if (AuthorityChecker.SUPER_USER.equals(userName)) {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@@ -70,7 +79,7 @@ public class StopPipeStatement extends Statement implements
IConfigStatement {
}
@Override
- public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ public <R, C> R accept(final StatementVisitor<R, C> visitor, final C
context) {
return visitor.visitStopPipe(this, context);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
index 997278010e9..033fe650582 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
@@ -19,6 +19,10 @@
package org.apache.iotdb.commons.pipe.agent.task.meta;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
import org.apache.tsfile.utils.PublicBAOS;
import java.io.DataOutputStream;
@@ -26,6 +30,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Objects;
public class PipeMeta {
@@ -61,6 +66,41 @@ public class PipeMeta {
return temporaryMeta;
}
+ public boolean visibleUnder(final boolean isTableModel) {
+ final PipeParameters extractorParameters =
getStaticMeta().getExtractorParameters();
+
+ // visible under all model when 'mode.double-living' is set to true
+ final boolean isDoubleLiving =
+ extractorParameters.getBooleanOrDefault(
+ Arrays.asList(
+ PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
+ PipeExtractorConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
+ PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
+ if (isDoubleLiving) {
+ return true;
+ }
+
+ final boolean isTreeDialect =
+ extractorParameters
+ .getStringOrDefault(
+ SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TREE_VALUE)
+ .equals(SystemConstant.SQL_DIALECT_TREE_VALUE);
+ final Boolean _isCaptureTree =
+ extractorParameters.getBooleanByKeys(
+ PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY,
+ PipeExtractorConstant.SOURCE_CAPTURE_TREE_KEY);
+ final boolean isCaptureTree = Objects.nonNull(_isCaptureTree) ?
_isCaptureTree : isTreeDialect;
+ final Boolean _isCaptureTable =
+ extractorParameters.getBooleanByKeys(
+ PipeExtractorConstant.EXTRACTOR_CAPTURE_TABLE_KEY,
+ PipeExtractorConstant.SOURCE_CAPTURE_TABLE_KEY);
+ final boolean isCaptureTable =
+ Objects.nonNull(_isCaptureTable) ? _isCaptureTable : !isTreeDialect;
+
+ // visible under specific tree or table model <-> actually capture tree or
table data
+ return isTableModel ? isCaptureTable : isCaptureTree;
+ }
+
public ByteBuffer serialize() throws IOException {
final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMetaKeeper.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMetaKeeper.java
index f8a8c9e55ac..4009288dfc7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMetaKeeper.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMetaKeeper.java
@@ -85,6 +85,14 @@ public class PipeMetaKeeper {
return pipeNameToPipeMetaMap.containsKey(pipeName);
}
+ public boolean containsPipeMeta(String pipeName, boolean isTableModel) {
+ final PipeMeta pipeMeta = pipeNameToPipeMetaMap.get(pipeName);
+ if (Objects.isNull(pipeMeta)) {
+ return false;
+ }
+ return pipeMeta.visibleUnder(isTableModel);
+ }
+
public Iterable<PipeMeta> getPipeMetaList() {
return pipeNameToPipeMetaMap.values();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index 1335b995a59..d5d34c26920 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -119,6 +119,9 @@ public class PipeExtractorConstant {
public static final String EXTRACTOR_MODE_SNAPSHOT_KEY =
"extractor.mode.snapshot";
public static final String SOURCE_MODE_SNAPSHOT_KEY = "source.mode.snapshot";
public static final boolean EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE = false;
+ public static final String EXTRACTOR_MODE_DOUBLE_LIVING_KEY =
"extractor.mode.double-living";
+ public static final String SOURCE_MODE_DOUBLE_LIVING_KEY =
"source.mode.double-living";
+ public static final boolean EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE =
false;
public static final String EXTRACTOR_START_TIME_KEY = "extractor.start-time";
public static final String SOURCE_START_TIME_KEY = "source.start-time";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TablePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TablePattern.java
index 963595c62a6..00462cb5eb4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TablePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TablePattern.java
@@ -102,15 +102,22 @@ public class TablePattern {
*/
public static TablePattern parsePipePatternFromSourceParameters(
final PipeParameters sourceParameters) {
- final boolean isTableModelDataAllowedToBeCaptured =
+ final boolean isDoubleLiving =
sourceParameters.getBooleanOrDefault(
Arrays.asList(
- PipeExtractorConstant.EXTRACTOR_CAPTURE_TABLE_KEY,
- PipeExtractorConstant.SOURCE_CAPTURE_TABLE_KEY),
- !sourceParameters
- .getStringOrDefault(
- SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TREE_VALUE)
- .equals(SystemConstant.SQL_DIALECT_TREE_VALUE));
+ PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
+ PipeExtractorConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
+ PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
+ final boolean isTableModelDataAllowedToBeCaptured =
+ isDoubleLiving
+ || sourceParameters.getBooleanOrDefault(
+ Arrays.asList(
+ PipeExtractorConstant.EXTRACTOR_CAPTURE_TABLE_KEY,
+ PipeExtractorConstant.SOURCE_CAPTURE_TABLE_KEY),
+ !sourceParameters
+ .getStringOrDefault(
+ SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TREE_VALUE)
+ .equals(SystemConstant.SQL_DIALECT_TREE_VALUE));
final String databaseNamePattern =
sourceParameters.getStringOrDefault(
Arrays.asList(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
index 7ee8f6dd88d..f03f0a29186 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
@@ -70,15 +70,23 @@ public abstract class TreePattern {
*/
public static TreePattern parsePipePatternFromSourceParameters(
final PipeParameters sourceParameters) {
- final boolean isTreeModelDataAllowedToBeCaptured =
+ final boolean isDoubleLiving =
sourceParameters.getBooleanOrDefault(
Arrays.asList(
- PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY,
- PipeExtractorConstant.SOURCE_CAPTURE_TREE_KEY),
- sourceParameters
- .getStringOrDefault(
- SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TREE_VALUE)
- .equals(SystemConstant.SQL_DIALECT_TREE_VALUE));
+ PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
+ PipeExtractorConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
+ PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
+
+ final boolean isTreeModelDataAllowedToBeCaptured =
+ isDoubleLiving
+ || sourceParameters.getBooleanOrDefault(
+ Arrays.asList(
+ PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY,
+ PipeExtractorConstant.SOURCE_CAPTURE_TREE_KEY),
+ sourceParameters
+ .getStringOrDefault(
+ SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TREE_VALUE)
+ .equals(SystemConstant.SQL_DIALECT_TREE_VALUE));
final String path = sourceParameters.getStringByKeys(EXTRACTOR_PATH_KEY,
SOURCE_PATH_KEY);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java
index 2d3a1615f00..cecee61c7fa 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java
@@ -26,8 +26,10 @@ import org.apache.iotdb.pipe.api.PipeExtractor;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
import java.util.Arrays;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE;
@@ -85,6 +87,51 @@ public abstract class IoTDBExtractor implements
PipeExtractor {
.getStringOrDefault(
Arrays.asList(EXTRACTOR_EXCLUSION_KEY,
SOURCE_EXCLUSION_KEY),
EXTRACTOR_EXCLUSION_DEFAULT_VALUE));
+
+ // Validate double living
+ validateDoubleLiving(validator.getParameters());
+ }
+
+ private void validateDoubleLiving(final PipeParameters parameters) {
+ final boolean isDoubleLiving =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(
+ PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
+ PipeExtractorConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
+ PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
+ if (!isDoubleLiving) {
+ return;
+ }
+
+ // check 'capture.tree'
+ final Boolean isCaptureTree =
+ parameters.getBooleanByKeys(
+ PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY,
+ PipeExtractorConstant.SOURCE_CAPTURE_TREE_KEY);
+ if (Objects.nonNull(isCaptureTree) && !isCaptureTree) {
+ throw new PipeParameterNotValidException(
+ "capture.tree can not be specified to false when double living is
enabled");
+ }
+
+ // check 'capture.table'
+ final Boolean isCaptureTable =
+ parameters.getBooleanByKeys(
+ PipeExtractorConstant.EXTRACTOR_CAPTURE_TABLE_KEY,
+ PipeExtractorConstant.SOURCE_CAPTURE_TABLE_KEY);
+ if (Objects.nonNull(isCaptureTable) && !isCaptureTable) {
+ throw new PipeParameterNotValidException(
+ "capture.table can not be specified to false when double living is
enabled");
+ }
+
+ // check 'forwarding-pipe-requests'
+ final Boolean isForwardingPipeRequests =
+ parameters.getBooleanByKeys(
+ PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+ PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY);
+ if (Objects.nonNull(isForwardingPipeRequests) && isForwardingPipeRequests)
{
+ throw new PipeParameterNotValidException(
+ "forwarding-pipe-requests can not be specified to true when double
living is enabled");
+ }
}
@Override
@@ -99,12 +146,22 @@ public abstract class IoTDBExtractor implements
PipeExtractor {
taskID = pipeName + "_" + regionId + "_" + creationTime;
pipeTaskMeta = environment.getPipeTaskMeta();
- isForwardingPipeRequests =
+ final boolean isDoubleLiving =
parameters.getBooleanOrDefault(
Arrays.asList(
- PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
- PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
-
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+ PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
+ PipeExtractorConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
+ PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
+ if (isDoubleLiving) {
+ isForwardingPipeRequests = false;
+ } else {
+ isForwardingPipeRequests =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(
+ PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+ PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
+
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+ }
}
@Override
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 26b1b426082..ebe67399552 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -751,11 +751,23 @@ struct TAlterPipeReq {
6: optional map<string, string> extractorAttributes
7: optional bool isReplaceAllExtractorAttributes
8: optional bool ifExistsCondition
+ 9: optional bool isTableModel
+}
+
+struct TStartPipeReq {
+ 1: required string pipeName
+ 2: optional bool isTableModel
+}
+
+struct TStopPipeReq {
+ 1: required string pipeName
+ 2: optional bool isTableModel
}
struct TDropPipeReq {
1: required string pipeName
2: optional bool ifExistsCondition
+ 3: optional bool isTableModel
}
// Deprecated, restored for compatibility
@@ -768,6 +780,7 @@ struct TPipeSinkInfo {
struct TShowPipeReq {
1: optional string pipeName
2: optional bool whereClause
+ 3: optional bool isTableModel
}
struct TShowPipeResp {
@@ -1680,9 +1693,15 @@ service IConfigNodeRPCService {
/** Start Pipe */
common.TSStatus startPipe(string pipeName)
+ /** Start Pipe */
+ common.TSStatus startPipeExtended(TStartPipeReq req)
+
/** Stop Pipe */
common.TSStatus stopPipe(string pipeName)
+ /** Stop Pipe */
+ common.TSStatus stopPipeExtended(TStopPipeReq req)
+
/** Drop Pipe */
common.TSStatus dropPipe(string pipeName)