This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f01571f4c6 Pipe: tree/table model isolation for 
alter/start/stop/drop/show pipe operations & support 'mode.double-living' 
(#14386)
5f01571f4c6 is described below

commit 5f01571f4c64033c460545ff885f320433ff8bfc
Author: VGalaxies <[email protected]>
AuthorDate: Thu Jan 9 19:18:54 2025 +0800

    Pipe: tree/table model isolation for alter/start/stop/drop/show pipe 
operations & support 'mode.double-living' (#14386)
---
 .../iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java |   3 +-
 .../it/autocreate/IoTDBPipeSwitchStatusIT.java     |  23 +-
 .../iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java |  30 +-
 .../it/tablemodel/IoTDBPipeDoubleLivingIT.java     | 329 ++++++++++++++++++
 .../pipe/it/tablemodel/IoTDBPipeIsolationIT.java   | 377 +++++++++++++++++++++
 .../it/tablemodel/IoTDBPipeSwitchStatusIT.java     |  23 +-
 .../pipe/it/tablemodel/IoTDBPipeSyntaxIT.java      |  33 +-
 .../iotdb/pipe/it/tablemodel/TableModelUtils.java  |  17 +
 .../response/pipe/task/PipeTableResp.java          |   7 +
 .../iotdb/confignode/manager/ConfigManager.java    |  10 +-
 .../apache/iotdb/confignode/manager/IManager.java  |  23 +-
 .../pipe/coordinator/task/PipeTaskCoordinator.java |  74 ++--
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  21 +-
 .../impl/pipe/task/AlterPipeProcedureV2.java       |   4 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  16 +-
 .../dataregion/IoTDBDataRegionExtractor.java       |  20 +-
 .../iotdb/db/protocol/client/ConfigNodeClient.java |  14 +
 .../config/executor/ClusterConfigTaskExecutor.java |  20 +-
 .../execution/config/sys/pipe/AlterPipeTask.java   |   8 +-
 .../execution/config/sys/pipe/DropPipeTask.java    |   7 +-
 .../execution/config/sys/pipe/ShowPipeTask.java    |   3 +-
 .../execution/config/sys/pipe/StartPipeTask.java   |   7 +-
 .../execution/config/sys/pipe/StopPipeTask.java    |   7 +-
 .../plan/relational/sql/ast/AlterPipe.java         |  22 +-
 .../plan/relational/sql/ast/DropPipe.java          |  10 +-
 .../plan/relational/sql/ast/ShowPipes.java         |  10 +-
 .../plan/relational/sql/ast/StartPipe.java         |   8 +-
 .../plan/relational/sql/ast/StopPipe.java          |   8 +-
 .../metadata/pipe/AlterPipeStatement.java          |  31 +-
 .../statement/metadata/pipe/DropPipeStatement.java |  23 +-
 .../metadata/pipe/ShowPipesStatement.java          |  18 +-
 .../metadata/pipe/StartPipeStatement.java          |  17 +-
 .../statement/metadata/pipe/StopPipeStatement.java |  17 +-
 .../commons/pipe/agent/task/meta/PipeMeta.java     |  40 +++
 .../pipe/agent/task/meta/PipeMetaKeeper.java       |   8 +
 .../config/constant/PipeExtractorConstant.java     |   3 +
 .../pipe/datastructure/pattern/TablePattern.java   |  21 +-
 .../pipe/datastructure/pattern/TreePattern.java    |  22 +-
 .../commons/pipe/extractor/IoTDBExtractor.java     |  65 +++-
 .../src/main/thrift/confignode.thrift              |  19 ++
 40 files changed, 1239 insertions(+), 179 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
index 66bae32c5b1..a2f5c65cb73 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
-import org.apache.iotdb.itbase.env.BaseEnv;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -106,7 +105,7 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeDualAutoIT {
     }
 
     // Alter pipe (modify)
-    try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+    try (final Connection connection = senderEnv.getConnection();
         final Statement statement = connection.createStatement()) {
       statement.execute("alter pipe a2b modify source 
('source.pattern'='root.test2')");
     } catch (SQLException e) {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSwitchStatusIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSwitchStatusIT.java
index cdcc41ad53f..03e52a982e6 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSwitchStatusIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSwitchStatusIT.java
@@ -266,11 +266,14 @@ public class IoTDBPipeSwitchStatusIT extends 
AbstractPipeDualAutoIT {
                   .setExtractorAttributes(extractorAttributes)
                   .setProcessorAttributes(processorAttributes));
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
-      Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.startPipe("").getCode());
       Assert.assertEquals(
-          TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.startPipe("p0").getCode());
-      Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.startPipe("p").getCode());
-      Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.startPipe("*").getCode());
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), 
client.startPipe("").getCode());
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), 
client.startPipe("p0").getCode());
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), 
client.startPipe("p").getCode());
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), 
client.startPipe("*").getCode());
       List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
       Assert.assertTrue(
           showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("RUNNING")));
@@ -281,10 +284,14 @@ public class IoTDBPipeSwitchStatusIT extends 
AbstractPipeDualAutoIT {
       Assert.assertTrue(
           showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("RUNNING")));
 
-      Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.stopPipe("").getCode());
-      Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.stopPipe("p0").getCode());
-      Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.stopPipe("p").getCode());
-      Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.stopPipe("*").getCode());
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), 
client.stopPipe("").getCode());
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), 
client.stopPipe("p0").getCode());
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), 
client.stopPipe("p").getCode());
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), 
client.stopPipe("*").getCode());
       showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
       Assert.assertTrue(
           showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("RUNNING")));
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java
index a81b02f7dd7..1665f9d2ce2 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java
@@ -65,7 +65,8 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeTableModelTestIT {
     long lastCreationTime;
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      final List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
       Assert.assertEquals(1, showPipeResult.size());
       // Check status
       Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
@@ -96,7 +97,8 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeTableModelTestIT {
     // Show pipe
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      final List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
       Assert.assertEquals(1, showPipeResult.size());
       // Check status
       Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
@@ -114,7 +116,8 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeTableModelTestIT {
     // Show pipe
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      final List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
       Assert.assertEquals(1, showPipeResult.size());
       // Check status
       Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
@@ -149,7 +152,8 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeTableModelTestIT {
     // Show pipe
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      final List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
       Assert.assertEquals(1, showPipeResult.size());
       // check status
       Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
@@ -183,7 +187,8 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeTableModelTestIT {
     // Show pipe
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      final List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
       Assert.assertEquals(1, showPipeResult.size());
       // Check status
       Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
@@ -225,7 +230,8 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeTableModelTestIT {
     // Show pipe
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      final List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
       Assert.assertEquals(1, showPipeResult.size());
       // Check status
       Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
@@ -257,7 +263,8 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeTableModelTestIT {
     // show pipe
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      final List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
       Assert.assertEquals(1, showPipeResult.size());
       // check status
       Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
@@ -289,7 +296,8 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeTableModelTestIT {
     // show pipe
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      final List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
       Assert.assertEquals(1, showPipeResult.size());
       // check status
       Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
@@ -323,7 +331,8 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeTableModelTestIT {
     // show pipe
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      final List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
       Assert.assertEquals(1, showPipeResult.size());
       // check status
       Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
@@ -354,7 +363,8 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeTableModelTestIT {
     // Show pipe
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      final List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
       Assert.assertEquals(1, showPipeResult.size());
       // Check status
       Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDoubleLivingIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDoubleLivingIT.java
new file mode 100644
index 00000000000..f6573728cf2
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDoubleLivingIT.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.tablemodel;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2TableModel;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2TableModel.class})
+public class IoTDBPipeDoubleLivingIT extends AbstractPipeTableModelTestIT {
+
+  @Test
+  public void testDoubleLivingInvalidParameter() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe %s"
+                  + " with source ("
+                  + "'capture.tree'='false',"
+                  + "'mode.double-living'='true')"
+                  + " with sink ("
+                  + "'node-urls'='%s')",
+              "p1", receiverDataNode.getIpAndPortString()));
+      fail();
+    } catch (final SQLException ignored) {
+    }
+
+    try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe %s"
+                  + " with source ("
+                  + "'capture.table'='false',"
+                  + "'mode.double-living'='true')"
+                  + " with sink ("
+                  + "'node-urls'='%s')",
+              "p2", receiverDataNode.getIpAndPortString()));
+      fail();
+    } catch (final SQLException ignored) {
+    }
+
+    try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe %s"
+                  + " with source ("
+                  + "'forwarding-pipe-requests'='true',"
+                  + "'mode.double-living'='true')"
+                  + " with sink ("
+                  + "'node-urls'='%s')",
+              "p3", receiverDataNode.getIpAndPortString()));
+      fail();
+    } catch (final SQLException ignored) {
+    }
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
+      Assert.assertEquals(0, showPipeResult.size());
+    }
+  }
+
+  // combination of 
org.apache.iotdb.pipe.it.tablemodel.IoTDBPipeLifeCycleIT.testDoubleLiving and
+  // org.apache.iotdb.pipe.it.autocreate.IoTDBPipeLifeCycleIT.testDoubleLiving
+  @Test
+  public void testBasicDoubleLiving() {
+    boolean insertResult;
+
+    final DataNodeWrapper senderDataNode = senderEnv.getDataNodeWrapper(0);
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+    final Consumer<String> handleFailure =
+        o -> {
+          TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+          TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+        };
+
+    // insertion on sender
+    for (int i = 0; i < 100; ++i) {
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i))) {
+        return;
+      }
+    }
+    TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
+    insertResult = TableModelUtils.insertData("test", "test", 0, 100, 
senderEnv);
+    if (!insertResult) {
+      return;
+    }
+    if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+      return;
+    }
+
+    try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe %s"
+                  + " with source ("
+                  + "'database-name'='test',"
+                  + "'table-name'='test',"
+                  + "'path'='root.db.d1.s1',"
+                  + "'mode.double-living'='true')"
+                  + " with sink ("
+                  + "'batch.enable'='false',"
+                  + "'node-urls'='%s')",
+              "p1", receiverDataNode.getIpAndPortString()));
+    } catch (final SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // insertion on sender
+    for (int i = 100; i < 200; ++i) {
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i))) {
+        return;
+      }
+    }
+    for (int i = 200; i < 300; ++i) {
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          receiverEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i))) {
+        return;
+      }
+    }
+    insertResult = TableModelUtils.insertData("test", "test", 100, 200, 
senderEnv);
+    if (!insertResult) {
+      return;
+    }
+    insertResult = TableModelUtils.insertData("test", "test", 200, 300, 
receiverEnv);
+    if (!insertResult) {
+      return;
+    }
+    if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+      return;
+    }
+
+    try (final Connection connection = 
receiverEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe %s"
+                  + " with source ("
+                  + "'database-name'='test',"
+                  + "'table-name'='test',"
+                  + "'path'='root.db.d1.s1',"
+                  + "'mode.double-living'='true')"
+                  + " with sink ("
+                  + "'batch.enable'='false',"
+                  + "'node-urls'='%s')",
+              "p2", senderDataNode.getIpAndPortString()));
+    } catch (final SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // insertion on receiver
+    for (int i = 300; i < 400; ++i) {
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          receiverEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i))) {
+        return;
+      }
+    }
+    insertResult = TableModelUtils.insertData("test", "test", 300, 400, 
receiverEnv);
+    if (!insertResult) {
+      return;
+    }
+    if (!TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, "flush")) {
+      return;
+    }
+
+    // check result
+    final Set<String> expectedResSet = new HashSet<>();
+    for (int i = 0; i < 400; ++i) {
+      expectedResSet.add(i + ",1.0,");
+    }
+    TestUtils.assertDataEventuallyOnEnv(
+        senderEnv, "select * from root.**", "Time,root.db.d1.s1,", 
expectedResSet);
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", 
expectedResSet);
+    TableModelUtils.assertData("test", "test", 0, 400, senderEnv, 
handleFailure);
+    TableModelUtils.assertData("test", "test", 0, 400, receiverEnv, 
handleFailure);
+
+    // restart cluster
+    try {
+      TestUtils.restartCluster(senderEnv);
+      TestUtils.restartCluster(receiverEnv);
+    } catch (final Throwable e) {
+      e.printStackTrace();
+      return;
+    }
+
+    // insertion on receiver
+    for (int i = 400; i < 500; ++i) {
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          receiverEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i))) {
+        return;
+      }
+    }
+    insertResult = TableModelUtils.insertData("test", "test", 400, 500, 
receiverEnv);
+    if (!insertResult) {
+      return;
+    }
+    if (!TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, "flush")) {
+      return;
+    }
+
+    // check result
+    for (int i = 400; i < 500; ++i) {
+      expectedResSet.add(i + ",1.0,");
+    }
+    TestUtils.assertDataEventuallyOnEnv(
+        senderEnv, "select * from root.**", "Time,root.db.d1.s1,", 
expectedResSet);
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", 
expectedResSet);
+    TableModelUtils.assertData("test", "test", 0, 500, senderEnv, 
handleFailure);
+    TableModelUtils.assertData("test", "test", 0, 500, receiverEnv, 
handleFailure);
+  }
+
+  @Test
+  public void testDoubleLivingIsolation() throws Exception {
+    final String treePipeName = "treePipe";
+    final String tablePipeName = "tablePipe";
+
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    // Create tree pipe
+    try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe %s"
+                  + " with source ("
+                  + "'mode.double-living'='true')"
+                  + " with sink ("
+                  + "'node-urls'='%s')",
+              treePipeName, receiverDataNode.getIpAndPortString()));
+    } catch (final SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Show tree pipe by tree session
+    Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, 
BaseEnv.TREE_SQL_DIALECT));
+
+    // Show table pipe by table session
+    Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, 
BaseEnv.TABLE_SQL_DIALECT));
+
+    // Create table pipe
+    try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe %s"
+                  + " with source ("
+                  + "'mode.double-living'='true')"
+                  + " with sink ("
+                  + "'node-urls'='%s')",
+              tablePipeName, receiverDataNode.getIpAndPortString()));
+    } catch (final SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Show tree pipe by tree session
+    Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv, 
BaseEnv.TREE_SQL_DIALECT));
+
+    // Show table pipe by table session
+    Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv, 
BaseEnv.TABLE_SQL_DIALECT));
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      // Drop pipe
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client.dropPipeExtended(new 
TDropPipeReq(treePipeName).setIsTableModel(true)).getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client
+              .dropPipeExtended(new 
TDropPipeReq(tablePipeName).setIsTableModel(false))
+              .getCode());
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeIsolationIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeIsolationIT.java
new file mode 100644
index 00000000000..57a3be38e21
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeIsolationIT.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.tablemodel;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2TableModel;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2TableModel.class})
+public class IoTDBPipeIsolationIT extends AbstractPipeTableModelTestIT {
+
+  @Test
+  public void testWritePipeIsolation() throws Exception {
+    final String treePipeName = "treePipe";
+    final String tablePipeName = "tablePipe";
+
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    // Create tree pipe
+    try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe %s with sink ('node-urls'='%s')",
+              treePipeName, receiverDataNode.getIpAndPortString()));
+    } catch (final SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Create table pipe
+    try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe %s with sink ('node-urls'='%s')",
+              tablePipeName, receiverDataNode.getIpAndPortString()));
+    } catch (final SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      // Start pipe
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
+          client
+              .startPipeExtended(new 
TStartPipeReq(treePipeName).setIsTableModel(true))
+              .getCode());
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
+          client
+              .startPipeExtended(new 
TStartPipeReq(tablePipeName).setIsTableModel(false))
+              .getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client
+              .startPipeExtended(new 
TStartPipeReq(treePipeName).setIsTableModel(false))
+              .getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client
+              .startPipeExtended(new 
TStartPipeReq(tablePipeName).setIsTableModel(true))
+              .getCode());
+
+      // Stop pipe
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
+          client.stopPipeExtended(new 
TStopPipeReq(treePipeName).setIsTableModel(true)).getCode());
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
+          client
+              .stopPipeExtended(new 
TStopPipeReq(tablePipeName).setIsTableModel(false))
+              .getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client.stopPipeExtended(new 
TStopPipeReq(treePipeName).setIsTableModel(false)).getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client.stopPipeExtended(new 
TStopPipeReq(tablePipeName).setIsTableModel(true)).getCode());
+
+      // Alter pipe
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
+          client
+              .alterPipe(
+                  new TAlterPipeReq(
+                          treePipeName,
+                          Collections.emptyMap(),
+                          Collections.emptyMap(),
+                          false,
+                          false)
+                      .setExtractorAttributes(Collections.emptyMap())
+                      .setIsReplaceAllExtractorAttributes(false)
+                      .setIsTableModel(true))
+              .getCode());
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
+          client
+              .alterPipe(
+                  new TAlterPipeReq(
+                          tablePipeName,
+                          Collections.emptyMap(),
+                          Collections.emptyMap(),
+                          false,
+                          false)
+                      .setExtractorAttributes(Collections.emptyMap())
+                      .setIsReplaceAllExtractorAttributes(false)
+                      .setIsTableModel(false))
+              .getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client
+              .alterPipe(
+                  new TAlterPipeReq(
+                          treePipeName,
+                          Collections.emptyMap(),
+                          Collections.emptyMap(),
+                          false,
+                          false)
+                      .setExtractorAttributes(Collections.emptyMap())
+                      .setIsReplaceAllExtractorAttributes(false)
+                      .setIsTableModel(false))
+              .getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client
+              .alterPipe(
+                  new TAlterPipeReq(
+                          tablePipeName,
+                          Collections.emptyMap(),
+                          Collections.emptyMap(),
+                          false,
+                          false)
+                      .setExtractorAttributes(Collections.emptyMap())
+                      .setIsReplaceAllExtractorAttributes(false)
+                      .setIsTableModel(true))
+              .getCode());
+
+      // Drop pipe
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
+          client.dropPipeExtended(new 
TDropPipeReq(treePipeName).setIsTableModel(true)).getCode());
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
+          client
+              .dropPipeExtended(new 
TDropPipeReq(tablePipeName).setIsTableModel(false))
+              .getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client.dropPipeExtended(new 
TDropPipeReq(treePipeName).setIsTableModel(false)).getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client.dropPipeExtended(new 
TDropPipeReq(tablePipeName).setIsTableModel(true)).getCode());
+    }
+  }
+
+  @Test
+  public void testReadPipeIsolation() {
+    final String treePipeName = "treePipe";
+    final String tablePipeName = "tablePipe";
+
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    // 1. Create tree pipe by tree session
+    try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe %s with sink ('node-urls'='%s')",
+              treePipeName, receiverDataNode.getIpAndPortString()));
+    } catch (final SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // Show tree pipe by tree session
+    Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, 
BaseEnv.TREE_SQL_DIALECT));
+
+    // Show table pipe by table session
+    Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, 
BaseEnv.TABLE_SQL_DIALECT));
+
+    // 2. Create table pipe by table session
+    try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe %s with sink ('node-urls'='%s')",
+              tablePipeName, receiverDataNode.getIpAndPortString()));
+    } catch (final SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // Show tree pipe by tree session
+    Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, 
BaseEnv.TREE_SQL_DIALECT));
+
+    // Show table pipe by table session
+    Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, 
BaseEnv.TABLE_SQL_DIALECT));
+  }
+
+  @Test
+  public void testCaptureTreeAndTableIsolation() throws Exception {
+    final String treePipeName = "tree_a2b";
+    final String tablePipeName = "table_a2b";
+
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    // 1. Create tree pipe by tree session
+    try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe %s"
+                  + " with source ("
+                  + "'capture.tree'='true',"
+                  + "'capture.table'='true')"
+                  + " with sink ("
+                  + "'node-urls'='%s')",
+              treePipeName, receiverDataNode.getIpAndPortString()));
+    } catch (final SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Show tree pipe by tree session
+    Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, 
BaseEnv.TREE_SQL_DIALECT));
+
+    // Show table pipe by table session
+    Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, 
BaseEnv.TABLE_SQL_DIALECT));
+
+    // 2. Create table pipe by table session
+    try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe %s"
+                  + " with source ("
+                  + "'capture.tree'='true',"
+                  + "'capture.table'='true')"
+                  + " with sink ("
+                  + "'node-urls'='%s')",
+              tablePipeName, receiverDataNode.getIpAndPortString()));
+    } catch (final SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Show tree pipe by tree session
+    Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv, 
BaseEnv.TREE_SQL_DIALECT));
+
+    // Show table pipe by table session
+    Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv, 
BaseEnv.TABLE_SQL_DIALECT));
+
+    // 3. Drop pipe
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client.dropPipeExtended(new 
TDropPipeReq(treePipeName).setIsTableModel(true)).getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client
+              .dropPipeExtended(new 
TDropPipeReq(tablePipeName).setIsTableModel(false))
+              .getCode());
+    }
+  }
+
+  @Test
+  public void testCaptureCornerCases() {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    // 1. Create tree pipe but capture table data
+    try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe %s"
+                  + " with source ("
+                  + "'capture.tree'='false',"
+                  + "'capture.table'='true')"
+                  + " with sink ("
+                  + "'node-urls'='%s')",
+              "p1", receiverDataNode.getIpAndPortString()));
+    } catch (final SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Show tree pipe by tree session
+    Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, 
BaseEnv.TREE_SQL_DIALECT));
+
+    // Show table pipe by table session
+    Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, 
BaseEnv.TABLE_SQL_DIALECT));
+
+    // 2. Create table pipe but capture tree data
+    try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe %s"
+                  + " with source ("
+                  + "'capture.tree'='true',"
+                  + "'capture.table'='false')"
+                  + " with sink ("
+                  + "'node-urls'='%s')",
+              "p2", receiverDataNode.getIpAndPortString()));
+    } catch (final SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Show tree pipe by tree session
+    Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, 
BaseEnv.TREE_SQL_DIALECT));
+
+    // Show table pipe by table session
+    Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, 
BaseEnv.TABLE_SQL_DIALECT));
+
+    // 3. Create pipe with capture.tree and capture.table set to false
+    try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe %s"
+                  + " with source ("
+                  + "'capture.tree'='false',"
+                  + "'capture.table'='false')"
+                  + " with sink ("
+                  + "'node-urls'='%s')",
+              "p3", receiverDataNode.getIpAndPortString()));
+      fail();
+    } catch (final SQLException ignored) {
+    }
+
+    // Show tree pipe by tree session
+    Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, 
BaseEnv.TREE_SQL_DIALECT));
+
+    // Show table pipe by table session
+    Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, 
BaseEnv.TABLE_SQL_DIALECT));
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSwitchStatusIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSwitchStatusIT.java
index 5c5cbcd7b95..4c80f0ae673 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSwitchStatusIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSwitchStatusIT.java
@@ -303,11 +303,14 @@ public class IoTDBPipeSwitchStatusIT extends 
AbstractPipeTableModelTestIT {
                   .setExtractorAttributes(extractorAttributes)
                   .setProcessorAttributes(processorAttributes));
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
-      Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.startPipe("").getCode());
       Assert.assertEquals(
-          TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.startPipe("p0").getCode());
-      Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.startPipe("p").getCode());
-      Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.startPipe("*").getCode());
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), 
client.startPipe("").getCode());
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), 
client.startPipe("p0").getCode());
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), 
client.startPipe("p").getCode());
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), 
client.startPipe("*").getCode());
       List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
       Assert.assertTrue(
           showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("RUNNING")));
@@ -318,10 +321,14 @@ public class IoTDBPipeSwitchStatusIT extends 
AbstractPipeTableModelTestIT {
       Assert.assertTrue(
           showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("RUNNING")));
 
-      Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.stopPipe("").getCode());
-      Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.stopPipe("p0").getCode());
-      Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.stopPipe("p").getCode());
-      Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), 
client.stopPipe("*").getCode());
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), 
client.stopPipe("").getCode());
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), 
client.stopPipe("p0").getCode());
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), 
client.stopPipe("p").getCode());
+      Assert.assertEquals(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), 
client.stopPipe("*").getCode());
       showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
       Assert.assertTrue(
           showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("RUNNING")));
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSyntaxIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSyntaxIT.java
index 24b1c92563b..09658a583b0 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSyntaxIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeSyntaxIT.java
@@ -89,7 +89,8 @@ public class IoTDBPipeSyntaxIT extends 
AbstractPipeTableModelTestIT {
         }
       }
 
-      List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
       for (final String pipeName : expectedPipeNames) {
         Assert.assertTrue(
             showPipeResult.stream()
@@ -106,7 +107,7 @@ public class IoTDBPipeSyntaxIT extends 
AbstractPipeTableModelTestIT {
         }
       }
 
-      showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+      showPipeResult = client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
       Assert.assertEquals(0, showPipeResult.size());
     }
   }
@@ -124,7 +125,7 @@ public class IoTDBPipeSyntaxIT extends 
AbstractPipeTableModelTestIT {
           String.format(
               "create pipe p1"
                   + " with source ( "
-                  + "'capture.table'='test',"
+                  + "'capture.table'='true',"
                   + "'database-name'='test',"
                   + "'table-name'='test',"
                   + "'mode.streaming'='true',"
@@ -177,7 +178,8 @@ public class IoTDBPipeSyntaxIT extends 
AbstractPipeTableModelTestIT {
       } catch (SQLException ignored) {
       }
 
-      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      final List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
       Assert.assertEquals(0, showPipeResult.size());
     }
   }
@@ -249,7 +251,8 @@ public class IoTDBPipeSyntaxIT extends 
AbstractPipeTableModelTestIT {
         fail(e.getMessage());
       }
 
-      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      final List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
       Assert.assertEquals(2, showPipeResult.size());
     }
   }
@@ -315,7 +318,8 @@ public class IoTDBPipeSyntaxIT extends 
AbstractPipeTableModelTestIT {
       } catch (SQLException ignored) {
       }
 
-      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      final List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
       Assert.assertEquals(1, showPipeResult.size());
     }
   }
@@ -522,7 +526,8 @@ public class IoTDBPipeSyntaxIT extends 
AbstractPipeTableModelTestIT {
       } catch (Exception ignored) {
       }
 
-      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      final List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
       Assert.assertEquals(2, showPipeResult.size());
     }
   }
@@ -540,6 +545,7 @@ public class IoTDBPipeSyntaxIT extends 
AbstractPipeTableModelTestIT {
       final Map<String, String> processorAttributes = new HashMap<>();
       final Map<String, String> connectorAttributes = new HashMap<>();
 
+      extractorAttributes.put("__system.sql-dialect", "table");
       extractorAttributes.put("extractor.database-name", "test");
       extractorAttributes.put("extractor.table-name", "test.*");
       extractorAttributes.put("extractor.inclusion", "data.insert");
@@ -578,10 +584,12 @@ public class IoTDBPipeSyntaxIT extends 
AbstractPipeTableModelTestIT {
                   .setProcessorAttributes(processorAttributes));
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
 
-      List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
       Assert.assertEquals(3, showPipeResult.size());
 
-      showPipeResult = client.showPipe(new 
TShowPipeReq().setPipeName("p1")).pipeInfoList;
+      showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true).setPipeName("p1")).pipeInfoList;
       Assert.assertTrue(showPipeResult.stream().anyMatch((o) -> 
o.id.equals("p1")));
       Assert.assertFalse(showPipeResult.stream().anyMatch((o) -> 
o.id.equals("p2")));
       Assert.assertFalse(showPipeResult.stream().anyMatch((o) -> 
o.id.equals("p3")));
@@ -589,7 +597,9 @@ public class IoTDBPipeSyntaxIT extends 
AbstractPipeTableModelTestIT {
       // Show all pipes whose connector is also used by p1.
       // p1 and p2 share the same connector parameters, so they have the same 
connector.
       showPipeResult =
-          client.showPipe(new 
TShowPipeReq().setPipeName("p1").setWhereClause(true)).pipeInfoList;
+          client.showPipe(
+                  new 
TShowPipeReq().setIsTableModel(true).setPipeName("p1").setWhereClause(true))
+              .pipeInfoList;
       Assert.assertTrue(showPipeResult.stream().anyMatch((o) -> 
o.id.equals("p1")));
       Assert.assertTrue(showPipeResult.stream().anyMatch((o) -> 
o.id.equals("p2")));
       Assert.assertFalse(showPipeResult.stream().anyMatch((o) -> 
o.id.equals("p3")));
@@ -678,7 +688,8 @@ public class IoTDBPipeSyntaxIT extends 
AbstractPipeTableModelTestIT {
         fail(e.getMessage());
       }
 
-      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      final List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setIsTableModel(true)).pipeInfoList;
       Assert.assertEquals(1, showPipeResult.size());
     }
   }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java
index c7da1382535..a3c8c375742 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/TableModelUtils.java
@@ -37,6 +37,8 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
 
 import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.text.SimpleDateFormat;
 import java.time.LocalDate;
@@ -772,4 +774,19 @@ public class TableModelUtils {
 
     return tablet;
   }
+
+  public static int showPipesCount(final BaseEnv baseEnv, final String 
sqlDialect) {
+    try (final Connection connection = baseEnv.getConnection(sqlDialect);
+        final Statement statement = connection.createStatement()) {
+      final ResultSet resultSet = statement.executeQuery("show pipes");
+      int count = 0;
+      while (resultSet.next()) {
+        count++;
+      }
+      return count;
+    } catch (final SQLException e) {
+      fail(e.getMessage());
+    }
+    return 0;
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index 020690f2465..20cd76e4a41 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -94,6 +94,13 @@ public class PipeTableResp implements DataSet {
     }
   }
 
+  public PipeTableResp filter(
+      final Boolean whereClause, final String pipeName, final boolean 
isTableModel) {
+    final PipeTableResp resp = filter(whereClause, pipeName);
+    resp.allPipeMeta.removeIf(meta -> !meta.visibleUnder(isTableModel));
+    return resp;
+  }
+
   public TGetAllPipeInfoResp convertToTGetAllPipeInfoResp() throws IOException 
{
     final List<ByteBuffer> pipeInformationByteBuffers = new ArrayList<>();
     for (final PipeMeta pipeMeta : allPipeMeta) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 130f1dcd15c..6f6d625768c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -226,6 +226,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTopicResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
@@ -2193,18 +2195,18 @@ public class ConfigManager implements IManager {
   }
 
   @Override
-  public TSStatus startPipe(String pipeName) {
+  public TSStatus startPipe(TStartPipeReq req) {
     TSStatus status = confirmLeader();
     return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        ? pipeManager.getPipeTaskCoordinator().startPipe(pipeName)
+        ? pipeManager.getPipeTaskCoordinator().startPipe(req)
         : status;
   }
 
   @Override
-  public TSStatus stopPipe(String pipeName) {
+  public TSStatus stopPipe(TStopPipeReq req) {
     TSStatus status = confirmLeader();
     return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        ? pipeManager.getPipeTaskCoordinator().stopPipe(pipeName)
+        ? pipeManager.getPipeTaskCoordinator().stopPipe(req)
         : status;
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 63e527440ed..82dae052ef2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -147,6 +147,8 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TShowTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTopicResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
@@ -694,34 +696,39 @@ public interface IManager {
    * Alter Pipe.
    *
    * @param req Info about Pipe
-   * @return TSStatus
+   * @return {@link TSStatusCode#SUCCESS_STATUS} if altered the pipe 
successfully, {@link
+   *     TSStatusCode#PIPE_ERROR} if encountered failure, {@link 
TSStatusCode#PIPE_NOT_EXIST_ERROR}
+   *     if the pipe does not exist.
    */
   TSStatus alterPipe(TAlterPipeReq req);
 
   /**
    * Start Pipe.
    *
-   * @param pipeName name of Pipe
+   * @param req Info about Pipe
    * @return {@link TSStatusCode#SUCCESS_STATUS} if started the pipe 
successfully, {@link
-   *     TSStatusCode#PIPE_ERROR} if encountered failure.
+   *     TSStatusCode#PIPE_ERROR} if encountered failure, {@link 
TSStatusCode#PIPE_NOT_EXIST_ERROR}
+   *     if the pipe does not exist.
    */
-  TSStatus startPipe(String pipeName);
+  TSStatus startPipe(TStartPipeReq req);
 
   /**
    * Stop Pipe.
    *
-   * @param pipeName name of Pipe
+   * @param req Info about Pipe
    * @return {@link TSStatusCode#SUCCESS_STATUS} if stopped the pipe 
successfully, {@link
-   *     TSStatusCode#PIPE_ERROR} if encountered failure.
+   *     TSStatusCode#PIPE_ERROR} if encountered failure, {@link 
TSStatusCode#PIPE_NOT_EXIST_ERROR}
+   *     if the pipe does not exist.
    */
-  TSStatus stopPipe(String pipeName);
+  TSStatus stopPipe(TStopPipeReq req);
 
   /**
    * Drop Pipe.
    *
    * @param req Info about Pipe
    * @return {@link TSStatusCode#SUCCESS_STATUS} if dropped the pipe 
successfully, {@link
-   *     TSStatusCode#PIPE_ERROR} if encountered failure.
+   *     TSStatusCode#PIPE_ERROR} if encountered failure, {@link 
TSStatusCode#PIPE_NOT_EXIST_ERROR}
+   *     if the pipe does not exist.
    */
   TSStatus dropPipe(TDropPipeReq req);
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index 7de39849e59..4aaf3ab46c3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -116,7 +118,7 @@ public class PipeTaskCoordinator {
 
   /** Caller should ensure that the method is called in the lock {@link 
#lock()}. */
   public TSStatus createPipe(TCreatePipeReq req) {
-    TSStatus status = null;
+    final TSStatus status;
     if (req.getPipeName().startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
       status = configManager.getProcedureManager().createConsensusPipe(req);
     } else {
@@ -130,6 +132,17 @@ public class PipeTaskCoordinator {
 
   /** Caller should ensure that the method is called in the lock {@link 
#lock()}. */
   public TSStatus alterPipe(TAlterPipeReq req) {
+    final String pipeName = req.getPipeName();
+    final boolean isSetIfExistsCondition =
+        req.isSetIfExistsCondition() && req.isIfExistsCondition();
+    if (!pipeTaskInfo.isPipeExisted(pipeName, req.isTableModel)) {
+      return isSetIfExistsCondition
+          ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
+          : RpcUtils.getStatus(
+              TSStatusCode.PIPE_NOT_EXIST_ERROR,
+              String.format(
+                  "Failed to alter pipe %s. Failures: %s does not exist.", 
pipeName, pipeName));
+    }
     final TSStatus status = configManager.getProcedureManager().alterPipe(req);
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.warn("Failed to alter pipe {}. Result status: {}.", 
req.getPipeName(), status);
@@ -138,8 +151,8 @@ public class PipeTaskCoordinator {
   }
 
   /** Caller should ensure that the method is called in the lock {@link 
#lock()}. */
-  public TSStatus startPipe(String pipeName) {
-    TSStatus status = null;
+  private TSStatus startPipe(String pipeName) {
+    final TSStatus status;
     if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
       status = 
configManager.getProcedureManager().startConsensusPipe(pipeName);
     } else {
@@ -152,9 +165,21 @@ public class PipeTaskCoordinator {
   }
 
   /** Caller should ensure that the method is called in the lock {@link 
#lock()}. */
-  public TSStatus stopPipe(String pipeName) {
+  public TSStatus startPipe(TStartPipeReq req) {
+    final String pipeName = req.getPipeName();
+    if (!pipeTaskInfo.isPipeExisted(pipeName, req.isTableModel)) {
+      return RpcUtils.getStatus(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR,
+          String.format(
+              "Failed to start pipe %s. Failures: %s does not exist.", 
pipeName, pipeName));
+    }
+    return startPipe(pipeName);
+  }
+
+  /** Caller should ensure that the method is called in the lock {@link 
#lock()}. */
+  private TSStatus stopPipe(String pipeName) {
     final boolean isStoppedByRuntimeException = 
pipeTaskInfo.isStoppedByRuntimeException(pipeName);
-    TSStatus status = null;
+    final TSStatus status;
     if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
       status = configManager.getProcedureManager().stopConsensusPipe(pipeName);
     } else {
@@ -176,11 +201,32 @@ public class PipeTaskCoordinator {
     return status;
   }
 
+  /** Caller should ensure that the method is called in the lock {@link 
#lock()}. */
+  public TSStatus stopPipe(TStopPipeReq req) {
+    final String pipeName = req.getPipeName();
+    if (!pipeTaskInfo.isPipeExisted(pipeName, req.isTableModel)) {
+      return RpcUtils.getStatus(
+          TSStatusCode.PIPE_NOT_EXIST_ERROR,
+          String.format(
+              "Failed to stop pipe %s. Failures: %s does not exist.", 
pipeName, pipeName));
+    }
+    return stopPipe(pipeName);
+  }
+
   /** Caller should ensure that the method is called in the lock {@link 
#lock()}. */
   public TSStatus dropPipe(TDropPipeReq req) {
     final String pipeName = req.getPipeName();
-    final boolean isPipeExistedBeforeDrop = 
pipeTaskInfo.isPipeExisted(pipeName);
-    TSStatus status = null;
+    final boolean isSetIfExistsCondition =
+        req.isSetIfExistsCondition() && req.isIfExistsCondition();
+    if (!pipeTaskInfo.isPipeExisted(pipeName, req.isTableModel)) {
+      return isSetIfExistsCondition
+          ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
+          : RpcUtils.getStatus(
+              TSStatusCode.PIPE_NOT_EXIST_ERROR,
+              String.format(
+                  "Failed to drop pipe %s. Failures: %s does not exist.", 
pipeName, pipeName));
+    }
+    final TSStatus status;
     if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
       status = configManager.getProcedureManager().dropConsensusPipe(pipeName);
     } else {
@@ -189,23 +235,13 @@ public class PipeTaskCoordinator {
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.warn("Failed to drop pipe {}. Result status: {}.", pipeName, 
status);
     }
-
-    final boolean isSetIfExistsCondition =
-        req.isSetIfExistsCondition() && req.isIfExistsCondition();
-    // If the `IF EXISTS` condition is not set and the pipe does not exist 
before the delete
-    // operation, return an error status indicating that the pipe does not 
exist.
-    return isPipeExistedBeforeDrop || isSetIfExistsCondition
-        ? status
-        : RpcUtils.getStatus(
-            TSStatusCode.PIPE_NOT_EXIST_ERROR,
-            String.format(
-                "Failed to drop pipe %s. Failures: %s does not exist.", 
pipeName, pipeName));
+    return status;
   }
 
   public TShowPipeResp showPipes(final TShowPipeReq req) {
     try {
       return ((PipeTableResp) configManager.getConsensusManager().read(new 
ShowPipePlanV2()))
-          .filter(req.whereClause, req.pipeName)
+          .filter(req.whereClause, req.pipeName, req.isTableModel)
           .convertToTShowPipeResp();
     } catch (final ConsensusException e) {
       LOGGER.warn("Failed in the read API executing the consensus layer due 
to: ", e);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 372209bfd74..8fe6659dc0c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -187,23 +187,19 @@ public class PipeTaskInfo implements SnapshotProcessor {
     throw new PipeException(exceptionMessage);
   }
 
-  public boolean checkAndUpdateRequestBeforeAlterPipe(final TAlterPipeReq 
alterPipeRequest)
+  public void checkAndUpdateRequestBeforeAlterPipe(final TAlterPipeReq 
alterPipeRequest)
       throws PipeException {
     acquireReadLock();
     try {
-      return checkAndUpdateRequestBeforeAlterPipeInternal(alterPipeRequest);
+      checkAndUpdateRequestBeforeAlterPipeInternal(alterPipeRequest);
     } finally {
       releaseReadLock();
     }
   }
 
-  private boolean checkAndUpdateRequestBeforeAlterPipeInternal(final 
TAlterPipeReq alterPipeRequest)
+  private void checkAndUpdateRequestBeforeAlterPipeInternal(final 
TAlterPipeReq alterPipeRequest)
       throws PipeException {
     if (!isPipeExisted(alterPipeRequest.getPipeName())) {
-      if (alterPipeRequest.isSetIfExistsCondition() && 
alterPipeRequest.isIfExistsCondition()) {
-        return false;
-      }
-
       final String exceptionMessage =
           String.format(
               "Failed to alter pipe %s, %s", alterPipeRequest.getPipeName(), 
PIPE_NOT_EXIST_MSG);
@@ -267,8 +263,6 @@ public class PipeTaskInfo implements SnapshotProcessor {
                 .getAttribute());
       }
     }
-
-    return true;
   }
 
   public void checkBeforeStartPipe(final String pipeName) throws PipeException 
{
@@ -351,6 +345,15 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
+  public boolean isPipeExisted(final String pipeName, final boolean 
isTableModel) {
+    acquireReadLock();
+    try {
+      return pipeMetaKeeper.containsPipeMeta(pipeName, isTableModel);
+    } finally {
+      releaseReadLock();
+    }
+  }
+
   private PipeStatus getPipeStatus(final String pipeName) {
     acquireReadLock();
     try {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 8b6aceb9c30..ba7d40abc53 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -96,9 +96,7 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
 
     // We should execute checkBeforeAlterPipe before checking the pipe plugin. 
This method will
     // update the alterPipeRequest based on the alterPipeRequest and existing 
pipe metadata.
-    if 
(!pipeTaskInfo.get().checkAndUpdateRequestBeforeAlterPipe(alterPipeRequest)) {
-      return false;
-    }
+    pipeTaskInfo.get().checkAndUpdateRequestBeforeAlterPipe(alterPipeRequest);
 
     final PipeManager pipeManager = env.getConfigManager().getPipeManager();
     pipeManager
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 2961a5f7c42..219b68813e6 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -200,6 +200,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTopicResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
 import org.apache.iotdb.confignode.rpc.thrift.TTestOperation;
@@ -1113,12 +1115,22 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
 
   @Override
   public TSStatus startPipe(String pipeName) {
-    return configManager.startPipe(pipeName);
+    return configManager.startPipe(new TStartPipeReq().setPipeName(pipeName));
+  }
+
+  @Override
+  public TSStatus startPipeExtended(TStartPipeReq req) {
+    return configManager.startPipe(req);
   }
 
   @Override
   public TSStatus stopPipe(String pipeName) {
-    return configManager.stopPipe(pipeName);
+    return configManager.stopPipe(new TStopPipeReq().setPipeName(pipeName));
+  }
+
+  @Override
+  public TSStatus stopPipeExtended(TStopPipeReq req) {
+    return configManager.stopPipe(req);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index bf9a67061b4..0a033aff297 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
 
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
@@ -134,7 +135,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
             .getStringOrDefault(
                 SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TREE_VALUE)
             .equals(SystemConstant.SQL_DIALECT_TREE_VALUE);
-    final boolean isTreeModelDataAllowedToBeCaptured =
+    final boolean isCaptureTree =
         validator
             .getParameters()
             .getBooleanOrDefault(
@@ -142,7 +143,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
                     PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY,
                     PipeExtractorConstant.SOURCE_CAPTURE_TREE_KEY),
                 isTreeDialect);
-    final boolean isTableModelDataAllowedToBeCaptured =
+    final boolean isCaptureTable =
         validator
             .getParameters()
             .getBooleanOrDefault(
@@ -150,6 +151,21 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
                     PipeExtractorConstant.EXTRACTOR_CAPTURE_TABLE_KEY,
                     PipeExtractorConstant.SOURCE_CAPTURE_TABLE_KEY),
                 !isTreeDialect);
+    if (!isCaptureTree && !isCaptureTable) {
+      throw new PipeParameterNotValidException(
+          "capture.tree and capture.table can not both be specified as false");
+    }
+
+    final boolean isDoubleLiving =
+        validator
+            .getParameters()
+            .getBooleanOrDefault(
+                Arrays.asList(
+                    PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
+                    PipeExtractorConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
+                
PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
+    final boolean isTreeModelDataAllowedToBeCaptured = isDoubleLiving || 
isCaptureTree;
+    final boolean isTableModelDataAllowedToBeCaptured = isDoubleLiving || 
isCaptureTable;
     if (!isTreeModelDataAllowedToBeCaptured
         && validator
             .getParameters()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 5fd09f22a9a..f50fbfd63bd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -163,6 +163,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTopicResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
 import org.apache.iotdb.confignode.rpc.thrift.TTestOperation;
@@ -1054,12 +1056,24 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
         () -> client.startPipe(pipeName), status -> 
!updateConfigNodeLeader(status));
   }
 
+  @Override
+  public TSStatus startPipeExtended(TStartPipeReq req) throws TException {
+    return executeRemoteCallWithRetry(
+        () -> client.startPipeExtended(req), status -> 
!updateConfigNodeLeader(status));
+  }
+
   @Override
   public TSStatus stopPipe(String pipeName) throws TException {
     return executeRemoteCallWithRetry(
         () -> client.stopPipe(pipeName), status -> 
!updateConfigNodeLeader(status));
   }
 
+  @Override
+  public TSStatus stopPipeExtended(TStopPipeReq req) throws TException {
+    return executeRemoteCallWithRetry(
+        () -> client.stopPipeExtended(req), status -> 
!updateConfigNodeLeader(status));
+  }
+
   @Override
   public TSStatus dropPipe(String pipeName) throws TException {
     return executeRemoteCallWithRetry(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 4a14c71a3d3..3c41ea2ac2e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -133,6 +133,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTopicResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -2044,6 +2046,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       req.setExtractorAttributes(alterPipeStatement.getExtractorAttributes());
       
req.setIsReplaceAllExtractorAttributes(alterPipeStatement.isReplaceAllExtractorAttributes());
       req.setIfExistsCondition(alterPipeStatement.hasIfExistsCondition());
+      req.setIsTableModel(alterPipeStatement.isTableModel());
       final TSStatus tsStatus = configNodeClient.alterPipe(req);
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
         future.setException(new IoTDBException(tsStatus.message, 
tsStatus.code));
@@ -2073,7 +2076,11 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
 
     try (final ConfigNodeClient configNodeClient =
         
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      final TSStatus tsStatus = 
configNodeClient.startPipe(startPipeStatement.getPipeName());
+      final TSStatus tsStatus =
+          configNodeClient.startPipeExtended(
+              new TStartPipeReq()
+                  .setPipeName(startPipeStatement.getPipeName())
+                  .setIsTableModel(startPipeStatement.isTableModel()));
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
         future.setException(new IoTDBException(tsStatus.message, 
tsStatus.code));
       } else {
@@ -2106,7 +2113,8 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
           configNodeClient.dropPipeExtended(
               new TDropPipeReq()
                   .setPipeName(dropPipeStatement.getPipeName())
-                  
.setIfExistsCondition(dropPipeStatement.hasIfExistsCondition()));
+                  
.setIfExistsCondition(dropPipeStatement.hasIfExistsCondition())
+                  .setIsTableModel(dropPipeStatement.isTableModel()));
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
         future.setException(new IoTDBException(tsStatus.message, 
tsStatus.code));
       } else {
@@ -2135,7 +2143,12 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
 
     try (final ConfigNodeClient configNodeClient =
         
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      final TSStatus tsStatus = 
configNodeClient.stopPipe(stopPipeStatement.getPipeName());
+
+      final TSStatus tsStatus =
+          configNodeClient.stopPipeExtended(
+              new TStopPipeReq()
+                  .setPipeName(stopPipeStatement.getPipeName())
+                  .setIsTableModel(stopPipeStatement.isTableModel()));
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
         future.setException(new IoTDBException(tsStatus.message, 
tsStatus.code));
       } else {
@@ -2159,6 +2172,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       if (showPipesStatement.getWhereClause()) {
         tShowPipeReq.setWhereClause(true);
       }
+      tShowPipeReq.setIsTableModel(showPipesStatement.isTableModel());
       final List<TShowPipeInfo> tShowPipeInfoList =
           configNodeClient.showPipe(tShowPipeReq).getPipeInfoList();
       ShowPipeTask.buildTSBlock(tShowPipeInfoList, future);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
index f6d0ebab2cb..159c6941628 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
@@ -37,13 +37,13 @@ public class AlterPipeTask implements IConfigTask {
 
   private final AlterPipeStatement alterPipeStatement;
 
-  public AlterPipeTask(AlterPipeStatement alterPipeStatement) {
+  public AlterPipeTask(final AlterPipeStatement alterPipeStatement) {
     // support now() function
     
applyNowFunctionToExtractorAttributes(alterPipeStatement.getExtractorAttributes());
     this.alterPipeStatement = alterPipeStatement;
   }
 
-  public AlterPipeTask(AlterPipe node) {
+  public AlterPipeTask(final AlterPipe node) {
     alterPipeStatement = new AlterPipeStatement(StatementType.ALTER_PIPE);
     alterPipeStatement.setPipeName(node.getPipeName());
     alterPipeStatement.setIfExists(node.hasIfExistsCondition());
@@ -57,10 +57,12 @@ public class AlterPipeTask implements IConfigTask {
     
alterPipeStatement.setReplaceAllExtractorAttributes(node.isReplaceAllExtractorAttributes());
     
alterPipeStatement.setReplaceAllProcessorAttributes(node.isReplaceAllProcessorAttributes());
     
alterPipeStatement.setReplaceAllConnectorAttributes(node.isReplaceAllConnectorAttributes());
+
+    alterPipeStatement.setTableModel(true);
   }
 
   @Override
-  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
+  public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
     return configTaskExecutor.alterPipe(alterPipeStatement);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/DropPipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/DropPipeTask.java
index a28ca91f3c6..a10c6042241 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/DropPipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/DropPipeTask.java
@@ -32,18 +32,19 @@ public class DropPipeTask implements IConfigTask {
 
   private final DropPipeStatement dropPipeStatement;
 
-  public DropPipeTask(DropPipeStatement dropPipeStatement) {
+  public DropPipeTask(final DropPipeStatement dropPipeStatement) {
     this.dropPipeStatement = dropPipeStatement;
   }
 
-  public DropPipeTask(DropPipe node) {
+  public DropPipeTask(final DropPipe node) {
     dropPipeStatement = new DropPipeStatement(StatementType.DROP_PIPE);
     dropPipeStatement.setPipeName(node.getPipeName());
     dropPipeStatement.setIfExists(node.hasIfExistsCondition());
+    dropPipeStatement.setTableModel(true);
   }
 
   @Override
-  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
+  public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
     return configTaskExecutor.dropPipe(dropPipeStatement);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
index 6481d0713ac..c43a9953b58 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
@@ -52,10 +52,11 @@ public class ShowPipeTask implements IConfigTask {
     this.showPipesStatement = showPipesStatement;
   }
 
-  public ShowPipeTask(ShowPipes node) {
+  public ShowPipeTask(final ShowPipes node) {
     showPipesStatement = new ShowPipesStatement();
     showPipesStatement.setPipeName(node.getPipeName());
     showPipesStatement.setWhereClause(node.hasWhereClause());
+    showPipesStatement.setTableModel(true);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StartPipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StartPipeTask.java
index 39ff162a43c..c118f0a3a58 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StartPipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StartPipeTask.java
@@ -32,17 +32,18 @@ public class StartPipeTask implements IConfigTask {
 
   private final StartPipeStatement startPipeStatement;
 
-  public StartPipeTask(StartPipeStatement startPipeStatement) {
+  public StartPipeTask(final StartPipeStatement startPipeStatement) {
     this.startPipeStatement = startPipeStatement;
   }
 
-  public StartPipeTask(StartPipe node) {
+  public StartPipeTask(final StartPipe node) {
     startPipeStatement = new StartPipeStatement(StatementType.START_PIPE);
     startPipeStatement.setPipeName(node.getPipeName());
+    startPipeStatement.setTableModel(true);
   }
 
   @Override
-  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
+  public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
     return configTaskExecutor.startPipe(startPipeStatement);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StopPipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StopPipeTask.java
index 04806bef42f..91764c43263 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StopPipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/StopPipeTask.java
@@ -32,17 +32,18 @@ public class StopPipeTask implements IConfigTask {
 
   private final StopPipeStatement stopPipeStatement;
 
-  public StopPipeTask(StopPipeStatement stopPipeStatement) {
+  public StopPipeTask(final StopPipeStatement stopPipeStatement) {
     this.stopPipeStatement = stopPipeStatement;
   }
 
-  public StopPipeTask(StopPipe node) {
+  public StopPipeTask(final StopPipe node) {
     stopPipeStatement = new StopPipeStatement(StatementType.STOP_PIPE);
     stopPipeStatement.setPipeName(node.getPipeName());
+    stopPipeStatement.setTableModel(true);
   }
 
   @Override
-  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
+  public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
     return configTaskExecutor.stopPipe(stopPipeStatement);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AlterPipe.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AlterPipe.java
index 4d398b55fcc..a40fab917a5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AlterPipe.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AlterPipe.java
@@ -88,7 +88,7 @@ public class AlterPipe extends PipeStatement {
   }
 
   @Override
-  public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+  public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
     return visitor.visitAlterPipe(this, context);
   }
 
@@ -106,25 +106,25 @@ public class AlterPipe extends PipeStatement {
   }
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     if (this == obj) {
       return true;
     }
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    AlterPipe alterPipe = (AlterPipe) obj;
-    return Objects.equals(pipeName, alterPipe.pipeName)
-        && Objects.equals(ifExistsCondition, alterPipe.ifExistsCondition)
-        && Objects.equals(extractorAttributes, alterPipe.extractorAttributes)
-        && Objects.equals(processorAttributes, alterPipe.processorAttributes)
-        && Objects.equals(connectorAttributes, alterPipe.connectorAttributes)
+    final AlterPipe that = (AlterPipe) obj;
+    return Objects.equals(this.pipeName, that.pipeName)
+        && Objects.equals(this.ifExistsCondition, that.ifExistsCondition)
+        && Objects.equals(this.extractorAttributes, that.extractorAttributes)
+        && Objects.equals(this.processorAttributes, that.processorAttributes)
+        && Objects.equals(this.connectorAttributes, that.connectorAttributes)
         && Objects.equals(
-            isReplaceAllExtractorAttributes, 
alterPipe.isReplaceAllExtractorAttributes)
+            this.isReplaceAllExtractorAttributes, 
that.isReplaceAllExtractorAttributes)
         && Objects.equals(
-            isReplaceAllProcessorAttributes, 
alterPipe.isReplaceAllProcessorAttributes)
+            this.isReplaceAllProcessorAttributes, 
that.isReplaceAllProcessorAttributes)
         && Objects.equals(
-            isReplaceAllConnectorAttributes, 
alterPipe.isReplaceAllConnectorAttributes);
+            this.isReplaceAllConnectorAttributes, 
that.isReplaceAllConnectorAttributes);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java
index 6952c06d13d..1b3837ba214 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java
@@ -43,7 +43,7 @@ public class DropPipe extends PipeStatement {
   }
 
   @Override
-  public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+  public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
     return visitor.visitDropPipe(this, context);
   }
 
@@ -53,16 +53,16 @@ public class DropPipe extends PipeStatement {
   }
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     if (this == obj) {
       return true;
     }
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    DropPipe other = (DropPipe) obj;
-    return Objects.equals(pipeName, other.pipeName)
-        && Objects.equals(ifExistsCondition, other.ifExistsCondition);
+    final DropPipe that = (DropPipe) obj;
+    return Objects.equals(this.pipeName, that.pipeName)
+        && Objects.equals(this.ifExistsCondition, that.ifExistsCondition);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowPipes.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowPipes.java
index e6507961093..c0dfe9379de 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowPipes.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowPipes.java
@@ -44,7 +44,7 @@ public class ShowPipes extends PipeStatement {
   }
 
   @Override
-  public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+  public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
     return visitor.visitShowPipes(this, context);
   }
 
@@ -54,16 +54,16 @@ public class ShowPipes extends PipeStatement {
   }
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     if (this == obj) {
       return true;
     }
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    ShowPipes other = (ShowPipes) obj;
-    return Objects.equals(pipeName, other.pipeName)
-        && Objects.equals(hasWhereClause, other.hasWhereClause);
+    final ShowPipes that = (ShowPipes) obj;
+    return Objects.equals(this.pipeName, that.pipeName)
+        && Objects.equals(this.hasWhereClause, that.hasWhereClause);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StartPipe.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StartPipe.java
index 540d6fef9ec..15eb5b2413d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StartPipe.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StartPipe.java
@@ -37,7 +37,7 @@ public class StartPipe extends PipeStatement {
   }
 
   @Override
-  public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+  public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
     return visitor.visitStartPipe(this, context);
   }
 
@@ -47,15 +47,15 @@ public class StartPipe extends PipeStatement {
   }
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     if (this == obj) {
       return true;
     }
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    StartPipe other = (StartPipe) obj;
-    return Objects.equals(pipeName, other.pipeName);
+    final StartPipe that = (StartPipe) obj;
+    return Objects.equals(this.pipeName, that.pipeName);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java
index daa9ab32e02..6d1a629de88 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java
@@ -37,7 +37,7 @@ public class StopPipe extends PipeStatement {
   }
 
   @Override
-  public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+  public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
     return visitor.visitStopPipe(this, context);
   }
 
@@ -47,15 +47,15 @@ public class StopPipe extends PipeStatement {
   }
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     if (this == obj) {
       return true;
     }
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    StopPipe other = (StopPipe) obj;
-    return Objects.equals(pipeName, other.pipeName);
+    final StopPipe that = (StopPipe) obj;
+    return Objects.equals(this.pipeName, that.pipeName);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
index de5dc3c1d59..44065cf3185 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
@@ -44,8 +44,9 @@ public class AlterPipeStatement extends Statement implements 
IConfigStatement {
   private boolean isReplaceAllExtractorAttributes;
   private boolean isReplaceAllProcessorAttributes;
   private boolean isReplaceAllConnectorAttributes;
+  private boolean isTableModel;
 
-  public AlterPipeStatement(StatementType alterPipeStatement) {
+  public AlterPipeStatement(final StatementType alterPipeStatement) {
     this.statementType = alterPipeStatement;
   }
 
@@ -81,38 +82,46 @@ public class AlterPipeStatement extends Statement 
implements IConfigStatement {
     return isReplaceAllConnectorAttributes;
   }
 
-  public void setPipeName(String pipeName) {
+  public boolean isTableModel() {
+    return isTableModel;
+  }
+
+  public void setPipeName(final String pipeName) {
     this.pipeName = pipeName;
   }
 
-  public void setIfExists(boolean ifExistsCondition) {
+  public void setIfExists(final boolean ifExistsCondition) {
     this.ifExistsCondition = ifExistsCondition;
   }
 
-  public void setExtractorAttributes(Map<String, String> extractorAttributes) {
+  public void setExtractorAttributes(final Map<String, String> 
extractorAttributes) {
     this.extractorAttributes = extractorAttributes;
   }
 
-  public void setProcessorAttributes(Map<String, String> processorAttributes) {
+  public void setProcessorAttributes(final Map<String, String> 
processorAttributes) {
     this.processorAttributes = processorAttributes;
   }
 
-  public void setConnectorAttributes(Map<String, String> connectorAttributes) {
+  public void setConnectorAttributes(final Map<String, String> 
connectorAttributes) {
     this.connectorAttributes = connectorAttributes;
   }
 
-  public void setReplaceAllExtractorAttributes(boolean 
replaceAllExtractorAttributes) {
+  public void setReplaceAllExtractorAttributes(final boolean 
replaceAllExtractorAttributes) {
     isReplaceAllExtractorAttributes = replaceAllExtractorAttributes;
   }
 
-  public void setReplaceAllProcessorAttributes(boolean 
replaceAllProcessorAttributes) {
+  public void setReplaceAllProcessorAttributes(final boolean 
replaceAllProcessorAttributes) {
     isReplaceAllProcessorAttributes = replaceAllProcessorAttributes;
   }
 
-  public void setReplaceAllConnectorAttributes(boolean 
replaceAllConnectorAttributes) {
+  public void setReplaceAllConnectorAttributes(final boolean 
replaceAllConnectorAttributes) {
     isReplaceAllConnectorAttributes = replaceAllConnectorAttributes;
   }
 
+  public void setTableModel(final boolean tableModel) {
+    this.isTableModel = tableModel;
+  }
+
   @Override
   public QueryType getQueryType() {
     return QueryType.WRITE;
@@ -124,7 +133,7 @@ public class AlterPipeStatement extends Statement 
implements IConfigStatement {
   }
 
   @Override
-  public TSStatus checkPermissionBeforeProcess(String userName) {
+  public TSStatus checkPermissionBeforeProcess(final String userName) {
     if (AuthorityChecker.SUPER_USER.equals(userName)) {
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     }
@@ -134,7 +143,7 @@ public class AlterPipeStatement extends Statement 
implements IConfigStatement {
   }
 
   @Override
-  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+  public <R, C> R accept(final StatementVisitor<R, C> visitor, final C 
context) {
     return visitor.visitAlterPipe(this, context);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
index a3403e00e68..c61a15d777d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
@@ -37,27 +37,36 @@ public class DropPipeStatement extends Statement implements 
IConfigStatement {
 
   private String pipeName;
   private boolean ifExistsCondition;
+  private boolean isTableModel;
 
-  public DropPipeStatement(StatementType dropPipeStatement) {
+  public DropPipeStatement(final StatementType dropPipeStatement) {
     this.statementType = dropPipeStatement;
   }
 
+  public String getPipeName() {
+    return pipeName;
+  }
+
   public boolean hasIfExistsCondition() {
     return ifExistsCondition;
   }
 
-  public String getPipeName() {
-    return pipeName;
+  public boolean isTableModel() {
+    return isTableModel;
   }
 
-  public void setPipeName(String pipeName) {
+  public void setPipeName(final String pipeName) {
     this.pipeName = pipeName;
   }
 
-  public void setIfExists(boolean ifExistsCondition) {
+  public void setIfExists(final boolean ifExistsCondition) {
     this.ifExistsCondition = ifExistsCondition;
   }
 
+  public void setTableModel(final boolean tableModel) {
+    this.isTableModel = tableModel;
+  }
+
   @Override
   public QueryType getQueryType() {
     return QueryType.WRITE;
@@ -69,7 +78,7 @@ public class DropPipeStatement extends Statement implements 
IConfigStatement {
   }
 
   @Override
-  public TSStatus checkPermissionBeforeProcess(String userName) {
+  public TSStatus checkPermissionBeforeProcess(final String userName) {
     if (AuthorityChecker.SUPER_USER.equals(userName)) {
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     }
@@ -79,7 +88,7 @@ public class DropPipeStatement extends Statement implements 
IConfigStatement {
   }
 
   @Override
-  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+  public <R, C> R accept(final StatementVisitor<R, C> visitor, final C 
context) {
     return visitor.visitDropPipe(this, context);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/ShowPipesStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/ShowPipesStatement.java
index 54fe1099f39..658f36a0635 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/ShowPipesStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/ShowPipesStatement.java
@@ -39,6 +39,8 @@ public class ShowPipesStatement extends ShowStatement 
implements IConfigStatemen
 
   private boolean whereClause;
 
+  private boolean isTableModel;
+
   public String getPipeName() {
     return pipeName;
   }
@@ -47,21 +49,29 @@ public class ShowPipesStatement extends ShowStatement 
implements IConfigStatemen
     return whereClause;
   }
 
-  public void setPipeName(String pipeName) {
+  public boolean isTableModel() {
+    return isTableModel;
+  }
+
+  public void setPipeName(final String pipeName) {
     this.pipeName = pipeName;
   }
 
-  public void setWhereClause(boolean whereClause) {
+  public void setWhereClause(final boolean whereClause) {
     this.whereClause = whereClause;
   }
 
+  public void setTableModel(final boolean tableModel) {
+    this.isTableModel = tableModel;
+  }
+
   @Override
   public QueryType getQueryType() {
     return QueryType.READ;
   }
 
   @Override
-  public TSStatus checkPermissionBeforeProcess(String userName) {
+  public TSStatus checkPermissionBeforeProcess(final String userName) {
     if (AuthorityChecker.SUPER_USER.equals(userName)) {
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     }
@@ -71,7 +81,7 @@ public class ShowPipesStatement extends ShowStatement 
implements IConfigStatemen
   }
 
   @Override
-  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+  public <R, C> R accept(final StatementVisitor<R, C> visitor, final C 
context) {
     return visitor.visitShowPipes(this, context);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StartPipeStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StartPipeStatement.java
index 43b61a1fb44..9023d77c06e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StartPipeStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StartPipeStatement.java
@@ -36,8 +36,9 @@ import java.util.List;
 public class StartPipeStatement extends Statement implements IConfigStatement {
 
   private String pipeName;
+  private boolean isTableModel;
 
-  public StartPipeStatement(StatementType startPipeStatement) {
+  public StartPipeStatement(final StatementType startPipeStatement) {
     this.statementType = startPipeStatement;
   }
 
@@ -45,10 +46,18 @@ public class StartPipeStatement extends Statement 
implements IConfigStatement {
     return pipeName;
   }
 
-  public void setPipeName(String pipeName) {
+  public boolean isTableModel() {
+    return isTableModel;
+  }
+
+  public void setPipeName(final String pipeName) {
     this.pipeName = pipeName;
   }
 
+  public void setTableModel(final boolean tableModel) {
+    this.isTableModel = tableModel;
+  }
+
   @Override
   public QueryType getQueryType() {
     return QueryType.WRITE;
@@ -60,7 +69,7 @@ public class StartPipeStatement extends Statement implements 
IConfigStatement {
   }
 
   @Override
-  public TSStatus checkPermissionBeforeProcess(String userName) {
+  public TSStatus checkPermissionBeforeProcess(final String userName) {
     if (AuthorityChecker.SUPER_USER.equals(userName)) {
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     }
@@ -70,7 +79,7 @@ public class StartPipeStatement extends Statement implements 
IConfigStatement {
   }
 
   @Override
-  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+  public <R, C> R accept(final StatementVisitor<R, C> visitor, final C 
context) {
     return visitor.visitStartPipe(this, context);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StopPipeStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StopPipeStatement.java
index d463fda199d..ea14a3dbd1b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StopPipeStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/StopPipeStatement.java
@@ -36,8 +36,9 @@ import java.util.List;
 public class StopPipeStatement extends Statement implements IConfigStatement {
 
   private String pipeName;
+  private boolean isTableModel;
 
-  public StopPipeStatement(StatementType stopPipeStatement) {
+  public StopPipeStatement(final StatementType stopPipeStatement) {
     this.statementType = stopPipeStatement;
   }
 
@@ -45,10 +46,18 @@ public class StopPipeStatement extends Statement implements 
IConfigStatement {
     return pipeName;
   }
 
-  public void setPipeName(String pipeName) {
+  public boolean isTableModel() {
+    return isTableModel;
+  }
+
+  public void setPipeName(final String pipeName) {
     this.pipeName = pipeName;
   }
 
+  public void setTableModel(final boolean tableModel) {
+    this.isTableModel = tableModel;
+  }
+
   @Override
   public QueryType getQueryType() {
     return QueryType.WRITE;
@@ -60,7 +69,7 @@ public class StopPipeStatement extends Statement implements 
IConfigStatement {
   }
 
   @Override
-  public TSStatus checkPermissionBeforeProcess(String userName) {
+  public TSStatus checkPermissionBeforeProcess(final String userName) {
     if (AuthorityChecker.SUPER_USER.equals(userName)) {
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     }
@@ -70,7 +79,7 @@ public class StopPipeStatement extends Statement implements 
IConfigStatement {
   }
 
   @Override
-  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+  public <R, C> R accept(final StatementVisitor<R, C> visitor, final C 
context) {
     return visitor.visitStopPipe(this, context);
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
index 997278010e9..033fe650582 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
@@ -19,6 +19,10 @@
 
 package org.apache.iotdb.commons.pipe.agent.task.meta;
 
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
 import org.apache.tsfile.utils.PublicBAOS;
 
 import java.io.DataOutputStream;
@@ -26,6 +30,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Objects;
 
 public class PipeMeta {
@@ -61,6 +66,41 @@ public class PipeMeta {
     return temporaryMeta;
   }
 
+  public boolean visibleUnder(final boolean isTableModel) {
+    final PipeParameters extractorParameters = 
getStaticMeta().getExtractorParameters();
+
+    // visible under all model when 'mode.double-living' is set to true
+    final boolean isDoubleLiving =
+        extractorParameters.getBooleanOrDefault(
+            Arrays.asList(
+                PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
+                PipeExtractorConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
+            PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
+    if (isDoubleLiving) {
+      return true;
+    }
+
+    final boolean isTreeDialect =
+        extractorParameters
+            .getStringOrDefault(
+                SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TREE_VALUE)
+            .equals(SystemConstant.SQL_DIALECT_TREE_VALUE);
+    final Boolean _isCaptureTree =
+        extractorParameters.getBooleanByKeys(
+            PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY,
+            PipeExtractorConstant.SOURCE_CAPTURE_TREE_KEY);
+    final boolean isCaptureTree = Objects.nonNull(_isCaptureTree) ? 
_isCaptureTree : isTreeDialect;
+    final Boolean _isCaptureTable =
+        extractorParameters.getBooleanByKeys(
+            PipeExtractorConstant.EXTRACTOR_CAPTURE_TABLE_KEY,
+            PipeExtractorConstant.SOURCE_CAPTURE_TABLE_KEY);
+    final boolean isCaptureTable =
+        Objects.nonNull(_isCaptureTable) ? _isCaptureTable : !isTreeDialect;
+
+    // visible under specific tree or table model <-> actually capture tree or 
table data
+    return isTableModel ? isCaptureTable : isCaptureTree;
+  }
+
   public ByteBuffer serialize() throws IOException {
     final PublicBAOS byteArrayOutputStream = new PublicBAOS();
     final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMetaKeeper.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMetaKeeper.java
index f8a8c9e55ac..4009288dfc7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMetaKeeper.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMetaKeeper.java
@@ -85,6 +85,14 @@ public class PipeMetaKeeper {
     return pipeNameToPipeMetaMap.containsKey(pipeName);
   }
 
+  public boolean containsPipeMeta(String pipeName, boolean isTableModel) {
+    final PipeMeta pipeMeta = pipeNameToPipeMetaMap.get(pipeName);
+    if (Objects.isNull(pipeMeta)) {
+      return false;
+    }
+    return pipeMeta.visibleUnder(isTableModel);
+  }
+
   public Iterable<PipeMeta> getPipeMetaList() {
     return pipeNameToPipeMetaMap.values();
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index 1335b995a59..d5d34c26920 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -119,6 +119,9 @@ public class PipeExtractorConstant {
   public static final String EXTRACTOR_MODE_SNAPSHOT_KEY = 
"extractor.mode.snapshot";
   public static final String SOURCE_MODE_SNAPSHOT_KEY = "source.mode.snapshot";
   public static final boolean EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE = false;
+  public static final String EXTRACTOR_MODE_DOUBLE_LIVING_KEY = 
"extractor.mode.double-living";
+  public static final String SOURCE_MODE_DOUBLE_LIVING_KEY = 
"source.mode.double-living";
+  public static final boolean EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE = 
false;
 
   public static final String EXTRACTOR_START_TIME_KEY = "extractor.start-time";
   public static final String SOURCE_START_TIME_KEY = "source.start-time";
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TablePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TablePattern.java
index 963595c62a6..00462cb5eb4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TablePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TablePattern.java
@@ -102,15 +102,22 @@ public class TablePattern {
    */
   public static TablePattern parsePipePatternFromSourceParameters(
       final PipeParameters sourceParameters) {
-    final boolean isTableModelDataAllowedToBeCaptured =
+    final boolean isDoubleLiving =
         sourceParameters.getBooleanOrDefault(
             Arrays.asList(
-                PipeExtractorConstant.EXTRACTOR_CAPTURE_TABLE_KEY,
-                PipeExtractorConstant.SOURCE_CAPTURE_TABLE_KEY),
-            !sourceParameters
-                .getStringOrDefault(
-                    SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TREE_VALUE)
-                .equals(SystemConstant.SQL_DIALECT_TREE_VALUE));
+                PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
+                PipeExtractorConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
+            PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
+    final boolean isTableModelDataAllowedToBeCaptured =
+        isDoubleLiving
+            || sourceParameters.getBooleanOrDefault(
+                Arrays.asList(
+                    PipeExtractorConstant.EXTRACTOR_CAPTURE_TABLE_KEY,
+                    PipeExtractorConstant.SOURCE_CAPTURE_TABLE_KEY),
+                !sourceParameters
+                    .getStringOrDefault(
+                        SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TREE_VALUE)
+                    .equals(SystemConstant.SQL_DIALECT_TREE_VALUE));
     final String databaseNamePattern =
         sourceParameters.getStringOrDefault(
             Arrays.asList(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
index 7ee8f6dd88d..f03f0a29186 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
@@ -70,15 +70,23 @@ public abstract class TreePattern {
    */
   public static TreePattern parsePipePatternFromSourceParameters(
       final PipeParameters sourceParameters) {
-    final boolean isTreeModelDataAllowedToBeCaptured =
+    final boolean isDoubleLiving =
         sourceParameters.getBooleanOrDefault(
             Arrays.asList(
-                PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY,
-                PipeExtractorConstant.SOURCE_CAPTURE_TREE_KEY),
-            sourceParameters
-                .getStringOrDefault(
-                    SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TREE_VALUE)
-                .equals(SystemConstant.SQL_DIALECT_TREE_VALUE));
+                PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
+                PipeExtractorConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
+            PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
+
+    final boolean isTreeModelDataAllowedToBeCaptured =
+        isDoubleLiving
+            || sourceParameters.getBooleanOrDefault(
+                Arrays.asList(
+                    PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY,
+                    PipeExtractorConstant.SOURCE_CAPTURE_TREE_KEY),
+                sourceParameters
+                    .getStringOrDefault(
+                        SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TREE_VALUE)
+                    .equals(SystemConstant.SQL_DIALECT_TREE_VALUE));
 
     final String path = sourceParameters.getStringByKeys(EXTRACTOR_PATH_KEY, 
SOURCE_PATH_KEY);
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java
index 2d3a1615f00..cecee61c7fa 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java
@@ -26,8 +26,10 @@ import org.apache.iotdb.pipe.api.PipeExtractor;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
 
 import java.util.Arrays;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE;
@@ -85,6 +87,51 @@ public abstract class IoTDBExtractor implements 
PipeExtractor {
                 .getStringOrDefault(
                     Arrays.asList(EXTRACTOR_EXCLUSION_KEY, 
SOURCE_EXCLUSION_KEY),
                     EXTRACTOR_EXCLUSION_DEFAULT_VALUE));
+
+    // Validate double living
+    validateDoubleLiving(validator.getParameters());
+  }
+
+  private void validateDoubleLiving(final PipeParameters parameters) {
+    final boolean isDoubleLiving =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(
+                PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
+                PipeExtractorConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
+            PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
+    if (!isDoubleLiving) {
+      return;
+    }
+
+    // check 'capture.tree'
+    final Boolean isCaptureTree =
+        parameters.getBooleanByKeys(
+            PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY,
+            PipeExtractorConstant.SOURCE_CAPTURE_TREE_KEY);
+    if (Objects.nonNull(isCaptureTree) && !isCaptureTree) {
+      throw new PipeParameterNotValidException(
+          "capture.tree can not be specified to false when double living is 
enabled");
+    }
+
+    // check 'capture.table'
+    final Boolean isCaptureTable =
+        parameters.getBooleanByKeys(
+            PipeExtractorConstant.EXTRACTOR_CAPTURE_TABLE_KEY,
+            PipeExtractorConstant.SOURCE_CAPTURE_TABLE_KEY);
+    if (Objects.nonNull(isCaptureTable) && !isCaptureTable) {
+      throw new PipeParameterNotValidException(
+          "capture.table can not be specified to false when double living is 
enabled");
+    }
+
+    // check 'forwarding-pipe-requests'
+    final Boolean isForwardingPipeRequests =
+        parameters.getBooleanByKeys(
+            PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+            PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY);
+    if (Objects.nonNull(isForwardingPipeRequests) && isForwardingPipeRequests) 
{
+      throw new PipeParameterNotValidException(
+          "forwarding-pipe-requests can not be specified to true when double 
living is enabled");
+    }
   }
 
   @Override
@@ -99,12 +146,22 @@ public abstract class IoTDBExtractor implements 
PipeExtractor {
     taskID = pipeName + "_" + regionId + "_" + creationTime;
     pipeTaskMeta = environment.getPipeTaskMeta();
 
-    isForwardingPipeRequests =
+    final boolean isDoubleLiving =
         parameters.getBooleanOrDefault(
             Arrays.asList(
-                PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
-                PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
-            
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+                PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
+                PipeExtractorConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
+            PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
+    if (isDoubleLiving) {
+      isForwardingPipeRequests = false;
+    } else {
+      isForwardingPipeRequests =
+          parameters.getBooleanOrDefault(
+              Arrays.asList(
+                  PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+                  PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
+              
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+    }
   }
 
   @Override
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 26b1b426082..ebe67399552 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -751,11 +751,23 @@ struct TAlterPipeReq {
     6: optional map<string, string> extractorAttributes
     7: optional bool isReplaceAllExtractorAttributes
     8: optional bool ifExistsCondition
+    9: optional bool isTableModel
+}
+
+struct TStartPipeReq {
+    1: required string pipeName
+    2: optional bool isTableModel
+}
+
+struct TStopPipeReq {
+    1: required string pipeName
+    2: optional bool isTableModel
 }
 
 struct TDropPipeReq {
     1: required string pipeName
     2: optional bool ifExistsCondition
+    3: optional bool isTableModel
 }
 
 // Deprecated, restored for compatibility
@@ -768,6 +780,7 @@ struct TPipeSinkInfo {
 struct TShowPipeReq {
   1: optional string pipeName
   2: optional bool whereClause
+  3: optional bool isTableModel
 }
 
 struct TShowPipeResp {
@@ -1680,9 +1693,15 @@ service IConfigNodeRPCService {
   /** Start Pipe */
   common.TSStatus startPipe(string pipeName)
 
+  /** Start Pipe */
+  common.TSStatus startPipeExtended(TStartPipeReq req)
+
   /** Stop Pipe */
   common.TSStatus stopPipe(string pipeName)
 
+  /** Stop Pipe */
+  common.TSStatus stopPipeExtended(TStopPipeReq req)
+
   /** Drop Pipe */
   common.TSStatus dropPipe(string pipeName)
 

Reply via email to