This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 29a68365b86 Subscription: tree/table model isolation for 
topic/consumer/subscription entities (#15484)
29a68365b86 is described below

commit 29a68365b8612613eeb222746cb1d5b9061e9a27
Author: VGalaxies <[email protected]>
AuthorDate: Mon May 12 17:10:18 2025 +0800

    Subscription: tree/table model isolation for topic/consumer/subscription 
entities (#15484)
---
 .../tablemodel/IoTDBSubscriptionIsolationIT.java   | 218 +++++++++++++++++++++
 .../rpc/subscription/config/ConsumerConfig.java    |   4 +
 .../rpc/subscription/config/ConsumerConstant.java  |   3 +
 .../base/AbstractSubscriptionProvider.java         |   1 +
 .../subscription/SubscriptionTableResp.java        |  29 +--
 .../response/subscription/TopicTableResp.java      |  25 ++-
 .../subscription/SubscriptionCoordinator.java      |  29 ++-
 .../persistence/subscription/SubscriptionInfo.java |  42 +++-
 .../subscription/CreateSubscriptionProcedure.java  |  10 -
 .../InformationSchemaContentSupplierFactory.java   |   8 +-
 .../config/executor/ClusterConfigTaskExecutor.java |   5 +-
 .../config/sys/subscription/DropTopicTask.java     |   1 +
 .../sys/subscription/ShowSubscriptionsTask.java    |   1 +
 .../config/sys/subscription/ShowTopicsTask.java    |   1 +
 .../metadata/subscription/DropTopicStatement.java  |  17 +-
 .../subscription/ShowSubscriptionsStatement.java   |  15 +-
 .../metadata/subscription/ShowTopicsStatement.java |  15 +-
 .../receiver/SubscriptionReceiverV1.java           |  10 +-
 .../datastructure/visibility/VisibilityUtils.java  |  10 +
 .../meta/subscription/SubscriptionMeta.java        |  20 +-
 .../commons/subscription/meta/topic/TopicMeta.java |   9 +
 .../subscription/meta/topic/TopicMetaKeeper.java   |   8 +
 .../src/main/thrift/confignode.thrift              |   5 +
 23 files changed, 399 insertions(+), 87 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/tablemodel/IoTDBSubscriptionIsolationIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/tablemodel/IoTDBSubscriptionIsolationIT.java
new file mode 100644
index 00000000000..7a53bf08744
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/tablemodel/IoTDBSubscriptionIsolationIT.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.local.tablemodel;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.session.subscription.ISubscriptionTableSession;
+import org.apache.iotdb.session.subscription.ISubscriptionTreeSession;
+import org.apache.iotdb.session.subscription.SubscriptionTableSessionBuilder;
+import org.apache.iotdb.session.subscription.SubscriptionTreeSessionBuilder;
+import 
org.apache.iotdb.session.subscription.consumer.ISubscriptionTablePullConsumer;
+import 
org.apache.iotdb.session.subscription.consumer.ISubscriptionTreePullConsumer;
+import 
org.apache.iotdb.session.subscription.consumer.table.SubscriptionTablePullConsumerBuilder;
+import 
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumerBuilder;
+import org.apache.iotdb.subscription.it.local.AbstractSubscriptionLocalIT;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class})
+public class IoTDBSubscriptionIsolationIT extends AbstractSubscriptionLocalIT {
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  @Test
+  public void testTopicIsolation() throws Exception {
+    final String treeTopicName = "treeTopic";
+    final String tableTopicName = "tableTopic";
+
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+    // create tree topic
+    try (final ISubscriptionTreeSession session =
+        new SubscriptionTreeSessionBuilder().host(host).port(port).build()) {
+      session.open();
+      session.createTopic(treeTopicName);
+    }
+
+    // create table topic
+    try (final ISubscriptionTableSession session =
+        new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
+      session.createTopic(tableTopicName);
+    }
+
+    // show topic on tree session
+    try (final ISubscriptionTreeSession session =
+        new SubscriptionTreeSessionBuilder().host(host).port(port).build()) {
+      session.open();
+      Assert.assertEquals(1, session.getTopics().size());
+      Assert.assertTrue(session.getTopic(treeTopicName).isPresent());
+      Assert.assertFalse(session.getTopic(tableTopicName).isPresent());
+    }
+
+    // show topic on table session
+    try (final ISubscriptionTableSession session =
+        new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
+      Assert.assertEquals(1, session.getTopics().size());
+      Assert.assertTrue(session.getTopic(tableTopicName).isPresent());
+      Assert.assertFalse(session.getTopic(treeTopicName).isPresent());
+    }
+
+    // drop table topic on tree session
+    try (final ISubscriptionTreeSession session =
+        new SubscriptionTreeSessionBuilder().host(host).port(port).build()) {
+      session.open();
+      try {
+        session.dropTopic(tableTopicName);
+        fail();
+      } catch (final Exception ignored) {
+      }
+    }
+
+    // drop tree topic on table session
+    try (final ISubscriptionTableSession session =
+        new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
+      try {
+        session.dropTopic(treeTopicName);
+        fail();
+      } catch (final Exception ignored) {
+      }
+    }
+
+    // drop tree topic on tree session
+    try (final ISubscriptionTreeSession session =
+        new SubscriptionTreeSessionBuilder().host(host).port(port).build()) {
+      session.open();
+      session.dropTopic(treeTopicName);
+    }
+
+    // drop table topic on table session
+    try (final ISubscriptionTableSession session =
+        new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
+      session.dropTopic(tableTopicName);
+    }
+  }
+
+  @Test
+  public void testSubscriptionIsolation() throws Exception {
+    final String treeTopicName = "treeTopic";
+    final String tableTopicName = "tableTopic";
+
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+    // create tree topic
+    try (final ISubscriptionTreeSession session =
+        new SubscriptionTreeSessionBuilder().host(host).port(port).build()) {
+      session.open();
+      session.createTopic(treeTopicName);
+    }
+
+    // create table topic
+    try (final ISubscriptionTableSession session =
+        new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
+      session.createTopic(tableTopicName);
+    }
+
+    // subscribe table topic on tree consumer
+    try (final ISubscriptionTreePullConsumer consumer =
+        new 
SubscriptionTreePullConsumerBuilder().host(host).port(port).build()) {
+      consumer.open();
+      try {
+        consumer.subscribe(tableTopicName);
+        fail();
+      } catch (final Exception ignored) {
+      }
+    }
+
+    // subscribe tree topic on table consumer
+    try (final ISubscriptionTablePullConsumer consumer =
+        new 
SubscriptionTablePullConsumerBuilder().host(host).port(port).build()) {
+      consumer.open();
+      try {
+        consumer.subscribe(treeTopicName);
+        fail();
+      } catch (final Exception ignored) {
+      }
+    }
+
+    // subscribe tree topic on tree consumer
+    final ISubscriptionTreePullConsumer treeConsumer =
+        new 
SubscriptionTreePullConsumerBuilder().host(host).port(port).build();
+    treeConsumer.open();
+    treeConsumer.subscribe(treeTopicName);
+
+    // subscribe table topic on table consumer
+    final ISubscriptionTablePullConsumer tableConsumer =
+        new 
SubscriptionTablePullConsumerBuilder().host(host).port(port).build();
+    tableConsumer.open();
+    tableConsumer.subscribe(tableTopicName);
+
+    // show subscription on tree session
+    try (final ISubscriptionTreeSession session =
+        new SubscriptionTreeSessionBuilder().host(host).port(port).build()) {
+      session.open();
+      Assert.assertEquals(1, session.getSubscriptions().size());
+      Assert.assertEquals(1, session.getSubscriptions(treeTopicName).size());
+      Assert.assertEquals(0, session.getSubscriptions(tableTopicName).size());
+    }
+
+    // show subscription on table session
+    try (final ISubscriptionTableSession session =
+        new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
+      Assert.assertEquals(1, session.getSubscriptions().size());
+      Assert.assertEquals(1, session.getSubscriptions(tableTopicName).size());
+      Assert.assertEquals(0, session.getSubscriptions(treeTopicName).size());
+    }
+
+    // unsubscribe table topic on tree consumer
+    try {
+      treeConsumer.unsubscribe(tableTopicName);
+      fail();
+    } catch (final Exception ignored) {
+
+    }
+
+    // unsubscribe tree topic on table consumer
+    try {
+      tableConsumer.unsubscribe(treeTopicName);
+      fail();
+    } catch (final Exception ignored) {
+
+    }
+
+    // close consumers
+    treeConsumer.close();
+    tableConsumer.close();
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
index 0ae2c5bbdc8..100de1f1126 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
@@ -76,6 +76,10 @@ public class ConsumerConfig extends PipeParameters {
     return getString(ConsumerConstant.PASSWORD_KEY);
   }
 
+  public String getSqlDialect() {
+    return getString(ConsumerConstant.SQL_DIALECT_KEY);
+  }
+
   public void setConsumerId(final String consumerId) {
     attributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
   }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
index 5ad1d9ba435..dd0c583e40d 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
@@ -25,6 +25,9 @@ public class ConsumerConstant {
 
   /////////////////////////////// common ///////////////////////////////
 
+  // TODO: hide from the client
+  public static final String SQL_DIALECT_KEY = "sql-dialect";
+
   public static final String HOST_KEY = "host";
   public static final String PORT_KEY = "port";
   public static final String NODE_URLS_KEY = "node-urls";
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
index 781dacb9738..df5dfcfd7f7 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
@@ -163,6 +163,7 @@ public abstract class AbstractSubscriptionProvider {
     consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
     consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username);
     consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password);
+    consumerAttributes.put(ConsumerConstant.SQL_DIALECT_KEY, 
session.getSqlDialect());
 
     final PipeSubscribeHandshakeResp resp =
         handshake(new ConsumerConfig(consumerAttributes)); // throw 
SubscriptionException
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java
index d6679ea4c11..c242dbf2b08 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java
@@ -31,6 +31,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class SubscriptionTableResp implements DataSet {
   private final TSStatus status;
@@ -46,19 +48,18 @@ public class SubscriptionTableResp implements DataSet {
     this.allConsumerGroupMeta = allConsumerGroupMeta;
   }
 
-  public SubscriptionTableResp filter(String topicName) {
-    if (topicName == null) {
-      return this;
-    } else {
-      final List<SubscriptionMeta> filteredSubscriptionMeta = new 
ArrayList<>();
-      for (SubscriptionMeta subscriptionMeta : allSubscriptionMeta) {
-        if (subscriptionMeta.getTopicName().equals(topicName)) {
-          filteredSubscriptionMeta.add(subscriptionMeta);
-          break;
-        }
-      }
-      return new SubscriptionTableResp(status, filteredSubscriptionMeta, 
allConsumerGroupMeta);
-    }
+  public SubscriptionTableResp filter(String topicName, boolean isTableModel) {
+    return new SubscriptionTableResp(
+        status,
+        allSubscriptionMeta.stream()
+            .filter(
+                subscriptionMeta ->
+                    (Objects.isNull(topicName)
+                            || Objects.equals(
+                                
subscriptionMeta.getTopicMeta().getTopicName(), topicName))
+                        && 
subscriptionMeta.getTopicMeta().visibleUnder(isTableModel))
+            .collect(Collectors.toList()),
+        allConsumerGroupMeta);
   }
 
   public TShowSubscriptionResp convertToTShowSubscriptionResp() {
@@ -67,7 +68,7 @@ public class SubscriptionTableResp implements DataSet {
     for (SubscriptionMeta subscriptionMeta : allSubscriptionMeta) {
       showSubscriptionInfoList.add(
           new TShowSubscriptionInfo(
-              subscriptionMeta.getTopicName(),
+              subscriptionMeta.getTopicMeta().getTopicName(),
               subscriptionMeta.getConsumerGroupId(),
               subscriptionMeta.getConsumerIds()));
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/TopicTableResp.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/TopicTableResp.java
index 672a519243c..2c2e6f94bb3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/TopicTableResp.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/TopicTableResp.java
@@ -30,6 +30,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class TopicTableResp implements DataSet {
   private final TSStatus status;
@@ -40,19 +42,16 @@ public class TopicTableResp implements DataSet {
     this.allTopicMeta = allTopicMeta;
   }
 
-  public TopicTableResp filter(String topicName) {
-    if (topicName == null) {
-      return this;
-    } else {
-      final List<TopicMeta> filteredTopicMeta = new ArrayList<>();
-      for (TopicMeta topicMeta : allTopicMeta) {
-        if (topicMeta.getTopicName().equals(topicName)) {
-          filteredTopicMeta.add(topicMeta);
-          break;
-        }
-      }
-      return new TopicTableResp(status, filteredTopicMeta);
-    }
+  public TopicTableResp filter(String topicName, boolean isTableModel) {
+    return new TopicTableResp(
+        status,
+        allTopicMeta.stream()
+            .filter(
+                topicMeta ->
+                    (Objects.isNull(topicName)
+                            || Objects.equals(topicMeta.getTopicName(), 
topicName))
+                        && topicMeta.visibleUnder(isTableModel))
+            .collect(Collectors.toList()));
   }
 
   public TShowTopicResp convertToTShowTopicResp() {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index 20dfce44bf2..194ec1f31f9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -153,28 +153,23 @@ public class SubscriptionCoordinator {
 
   public TSStatus dropTopic(TDropTopicReq req) {
     final String topicName = req.getTopicName();
-    final boolean isTopicExistedBeforeDrop = 
subscriptionInfo.isTopicExisted(topicName);
-    final TSStatus status = 
configManager.getProcedureManager().dropTopic(topicName);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      LOGGER.warn("Failed to drop topic {}. Result status: {}.", topicName, 
status);
-    }
-
-    // If the `IF EXISTS` condition is not set and the topic does not exist 
before the drop
-    // operation, return an error status indicating that the topic does not 
exist.
-    final boolean isIfExistedConditionSet =
+    final boolean isSetIfExistsCondition =
         req.isSetIfExistsCondition() && req.isIfExistsCondition();
-    return isTopicExistedBeforeDrop || isIfExistedConditionSet
-        ? status
-        : RpcUtils.getStatus(
-            TSStatusCode.TOPIC_NOT_EXIST_ERROR,
-            String.format(
-                "Failed to drop topic %s. Failures: %s does not exist.", 
topicName, topicName));
+    if (!subscriptionInfo.isTopicExisted(topicName, req.isTableModel)) {
+      return isSetIfExistsCondition
+          ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
+          : RpcUtils.getStatus(
+              TSStatusCode.TOPIC_NOT_EXIST_ERROR,
+              String.format(
+                  "Failed to drop topic %s. Failures: %s does not exist.", 
topicName, topicName));
+    }
+    return configManager.getProcedureManager().dropTopic(topicName);
   }
 
   public TShowTopicResp showTopic(TShowTopicReq req) {
     try {
       return ((TopicTableResp) configManager.getConsensusManager().read(new 
ShowTopicPlan()))
-          .filter(req.getTopicName())
+          .filter(req.topicName, req.isTableModel)
           .convertToTShowTopicResp();
     } catch (Exception e) {
       LOGGER.warn("Failed to show topic info.", e);
@@ -252,7 +247,7 @@ public class SubscriptionCoordinator {
     try {
       return ((SubscriptionTableResp)
               configManager.getConsensusManager().read(new 
ShowSubscriptionPlan()))
-          .filter(req.getTopicName())
+          .filter(req.getTopicName(), req.isTableModel)
           .convertToTShowSubscriptionResp();
     } catch (Exception e) {
       LOGGER.warn("Failed to show subscription info.", e);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
index 2ccde563866..2329c05a30b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
@@ -268,6 +268,15 @@ public class SubscriptionInfo implements SnapshotProcessor 
{
     }
   }
 
+  public boolean isTopicExisted(String topicName, boolean isTableModel) {
+    acquireReadLock();
+    try {
+      return topicMetaKeeper.containsTopicMeta(topicName, isTableModel);
+    } finally {
+      releaseReadLock();
+    }
+  }
+
   public TopicMeta getTopicMeta(String topicName) {
     acquireReadLock();
     try {
@@ -567,23 +576,39 @@ public class SubscriptionInfo implements 
SnapshotProcessor {
 
   private void checkBeforeSubscribeInternal(TSubscribeReq subscribeReq)
       throws SubscriptionException {
+    String consumerId = subscribeReq.getConsumerId();
+    String consumerGroupId = subscribeReq.getConsumerGroupId();
+
     // 1. Check if the consumer exists
-    if (!isConsumerExisted(subscribeReq.getConsumerGroupId(), 
subscribeReq.getConsumerId())) {
+    if (!isConsumerExisted(consumerGroupId, consumerId)) {
       // There is no consumer with the same consumerId and consumerGroupId,
       // we should end the procedure
       final String exceptionMessage =
           String.format(
               "Failed to subscribe because the consumer %s in consumer group 
%s does not exist",
-              subscribeReq.getConsumerId(), subscribeReq.getConsumerGroupId());
+              consumerId, consumerGroupId);
       LOGGER.warn(exceptionMessage);
       throw new SubscriptionException(exceptionMessage);
     }
 
-    // 2. Check if all topics exist. No need to check if already subscribed.
-    for (String topic : subscribeReq.getTopicNames()) {
-      if (!isTopicExisted(topic)) {
+    ConsumerGroupMeta consumerGroupMeta = 
getConsumerGroupMeta(consumerGroupId);
+
+    // 2. Check all topics will be subscribed
+    for (String topicName : subscribeReq.getTopicNames()) {
+      // 2.1. check if exist
+      if (!isTopicExisted(topicName, subscribeReq.isTableModel)) {
+        final String exceptionMessage =
+            String.format("Failed to subscribe because the topic %s does not 
exist", topicName);
+        LOGGER.warn(exceptionMessage);
+        throw new SubscriptionException(exceptionMessage);
+      }
+
+      // 2.2. check username
+      if (!consumerGroupMeta.allowSubscribeTopicForConsumer(topicName, 
consumerId)) {
         final String exceptionMessage =
-            String.format("Failed to subscribe because the topic %s does not 
exist", topic);
+            String.format(
+                "Failed to subscribe topic %s for consumer %s because 
inconsistent username under the same consumer group",
+                topicName, consumerId);
         LOGGER.warn(exceptionMessage);
         throw new SubscriptionException(exceptionMessage);
       }
@@ -616,7 +641,7 @@ public class SubscriptionInfo implements SnapshotProcessor {
 
     // 2. Check if all topics exist. No need to check if already subscribed.
     for (String topic : unsubscribeReq.getTopicNames()) {
-      if (!isTopicExisted(topic)) {
+      if (!isTopicExisted(topic, unsubscribeReq.isTableModel)) {
         final String exceptionMessage =
             String.format("Failed to unsubscribe because the topic %s does not 
exist", topic);
         LOGGER.warn(exceptionMessage);
@@ -647,8 +672,7 @@ public class SubscriptionInfo implements SnapshotProcessor {
                 consumerGroupId, topicMeta.getTopicName());
         if (!subscribedConsumerIDs.isEmpty()) {
           allSubscriptions.add(
-              new SubscriptionMeta(
-                  topicMeta.getTopicName(), consumerGroupId, 
subscribedConsumerIDs));
+              new SubscriptionMeta(topicMeta, consumerGroupId, 
subscribedConsumerIDs));
         }
       }
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
index 31b852b38bb..cb5edd8cd91 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
@@ -107,16 +107,6 @@ public class CreateSubscriptionProcedure extends 
AbstractOperateSubscriptionAndP
     for (final String topicName : subscribeReq.getTopicNames()) {
       final String pipeName =
           PipeStaticMeta.generateSubscriptionPipeName(topicName, 
consumerGroupId);
-      // check username
-      if (!consumerGroupMeta.allowSubscribeTopicForConsumer(topicName, 
consumerId)) {
-        final String exceptionMessage =
-            String.format(
-                "Failed to subscribe topic %s for consumer %s because 
inconsistent username under the same consumer group",
-                topicName, consumerId);
-        LOGGER.warn(exceptionMessage);
-        throw new SubscriptionException(exceptionMessage);
-      }
-
       if (!subscriptionInfo.get().isTopicSubscribedByConsumerGroup(topicName, 
consumerGroupId)
           // even if there existed subscription meta, if there is no 
corresponding pipe meta, it
           // will try to create the pipe
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
index d8e680b432c..7699da64050 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
@@ -575,7 +575,11 @@ public class InformationSchemaContentSupplierFactory {
       super(dataTypes);
       try (final ConfigNodeClient client =
           
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
-        iterator = client.showTopic(new 
TShowTopicReq()).getTopicInfoList().iterator();
+        iterator =
+            client
+                .showTopic(new TShowTopicReq().setIsTableModel(true))
+                .getTopicInfoList()
+                .iterator();
       } catch (final Exception e) {
         lastException = e;
       }
@@ -606,7 +610,7 @@ public class InformationSchemaContentSupplierFactory {
           
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
         iterator =
             client
-                .showSubscription(new TShowSubscriptionReq())
+                .showSubscription(new 
TShowSubscriptionReq().setIsTableModel(true))
                 .getSubscriptionInfoList()
                 .iterator();
       } catch (final Exception e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 23b065b88c9..a4e36bc3837 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2357,6 +2357,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       if (showSubscriptionsStatement.getTopicName() != null) {
         
showSubscriptionReq.setTopicName(showSubscriptionsStatement.getTopicName());
       }
+      
showSubscriptionReq.setIsTableModel(showSubscriptionsStatement.isTableModel());
 
       final TShowSubscriptionResp showSubscriptionResp =
           configNodeClient.showSubscription(showSubscriptionReq);
@@ -2456,7 +2457,8 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
           configNodeClient.dropTopicExtended(
               new TDropTopicReq()
                   
.setIfExistsCondition(dropTopicStatement.hasIfExistsCondition())
-                  .setTopicName(dropTopicStatement.getTopicName()));
+                  .setTopicName(dropTopicStatement.getTopicName())
+                  .setIsTableModel(dropTopicStatement.isTableModel()));
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
         future.setException(new IoTDBException(tsStatus.message, 
tsStatus.code));
       } else {
@@ -2478,6 +2480,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       if (showTopicsStatement.getTopicName() != null) {
         showTopicReq.setTopicName(showTopicsStatement.getTopicName());
       }
+      showTopicReq.setIsTableModel(showTopicsStatement.isTableModel());
 
       final TShowTopicResp showTopicResp = 
configNodeClient.showTopic(showTopicReq);
       if (showTopicResp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropTopicTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropTopicTask.java
index fe28c387bf7..d7ff17d2873 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropTopicTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropTopicTask.java
@@ -39,6 +39,7 @@ public class DropTopicTask implements IConfigTask {
     this.dropTopicStatement = new DropTopicStatement();
     this.dropTopicStatement.setTopicName(dropTopic.getTopicName());
     this.dropTopicStatement.setIfExists(dropTopic.hasIfExistsCondition());
+    this.dropTopicStatement.setTableModel(true);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
index b421a25e3ce..c59a5d7b902 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
@@ -50,6 +50,7 @@ public class ShowSubscriptionsTask implements IConfigTask {
   public ShowSubscriptionsTask(final ShowSubscriptions showSubscriptions) {
     this.showSubscriptionsStatement = new ShowSubscriptionsStatement();
     
this.showSubscriptionsStatement.setTopicName(showSubscriptions.getTopicName());
+    this.showSubscriptionsStatement.setTableModel(true);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowTopicsTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowTopicsTask.java
index 55578168d19..1871ef66a50 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowTopicsTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowTopicsTask.java
@@ -50,6 +50,7 @@ public class ShowTopicsTask implements IConfigTask {
   public ShowTopicsTask(final ShowTopics showTopics) {
     this.showTopicsStatement = new ShowTopicsStatement();
     this.showTopicsStatement.setTopicName(showTopics.getTopicName());
+    this.showTopicsStatement.setTableModel(true);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
index 97d173b9fdd..3bc5d866bc4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
@@ -37,6 +37,7 @@ public class DropTopicStatement extends Statement implements 
IConfigStatement {
 
   private String topicName;
   private boolean ifExistsCondition;
+  private boolean isTableModel;
 
   public DropTopicStatement() {
     super();
@@ -51,14 +52,22 @@ public class DropTopicStatement extends Statement 
implements IConfigStatement {
     return ifExistsCondition;
   }
 
-  public void setTopicName(String topicName) {
+  public boolean isTableModel() {
+    return isTableModel;
+  }
+
+  public void setTopicName(final String topicName) {
     this.topicName = topicName;
   }
 
-  public void setIfExists(boolean ifExistsCondition) {
+  public void setIfExists(final boolean ifExistsCondition) {
     this.ifExistsCondition = ifExistsCondition;
   }
 
+  public void setTableModel(final boolean tableModel) {
+    this.isTableModel = tableModel;
+  }
+
   @Override
   public QueryType getQueryType() {
     return QueryType.WRITE;
@@ -70,12 +79,12 @@ public class DropTopicStatement extends Statement 
implements IConfigStatement {
   }
 
   @Override
-  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+  public <R, C> R accept(final StatementVisitor<R, C> visitor, final C 
context) {
     return visitor.visitDropTopic(this, context);
   }
 
   @Override
-  public TSStatus checkPermissionBeforeProcess(String userName) {
+  public TSStatus checkPermissionBeforeProcess(final String userName) {
     if (AuthorityChecker.SUPER_USER.equals(userName)) {
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/ShowSubscriptionsStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/ShowSubscriptionsStatement.java
index ff7267f9881..9bccb3270ae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/ShowSubscriptionsStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/ShowSubscriptionsStatement.java
@@ -36,6 +36,7 @@ import java.util.List;
 public class ShowSubscriptionsStatement extends Statement implements 
IConfigStatement {
 
   private String topicName;
+  private boolean isTableModel;
 
   public ShowSubscriptionsStatement() {
     super();
@@ -46,12 +47,20 @@ public class ShowSubscriptionsStatement extends Statement 
implements IConfigStat
     return topicName;
   }
 
-  public void setTopicName(String topicName) {
+  public boolean isTableModel() {
+    return isTableModel;
+  }
+
+  public void setTopicName(final String topicName) {
     this.topicName = topicName;
   }
 
+  public void setTableModel(final boolean tableModel) {
+    this.isTableModel = tableModel;
+  }
+
   @Override
-  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+  public <R, C> R accept(final StatementVisitor<R, C> visitor, final C 
context) {
     return visitor.visitShowSubscriptions(this, context);
   }
 
@@ -66,7 +75,7 @@ public class ShowSubscriptionsStatement extends Statement 
implements IConfigStat
   }
 
   @Override
-  public TSStatus checkPermissionBeforeProcess(String userName) {
+  public TSStatus checkPermissionBeforeProcess(final String userName) {
     if (AuthorityChecker.SUPER_USER.equals(userName)) {
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/ShowTopicsStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/ShowTopicsStatement.java
index 82a4a87dc93..eee05092407 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/ShowTopicsStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/ShowTopicsStatement.java
@@ -36,6 +36,7 @@ import java.util.List;
 public class ShowTopicsStatement extends Statement implements IConfigStatement 
{
 
   private String topicName;
+  private boolean isTableModel;
 
   public ShowTopicsStatement() {
     super();
@@ -46,12 +47,20 @@ public class ShowTopicsStatement extends Statement 
implements IConfigStatement {
     return topicName;
   }
 
-  public void setTopicName(String topicName) {
+  public boolean isTableModel() {
+    return isTableModel;
+  }
+
+  public void setTopicName(final String topicName) {
     this.topicName = topicName;
   }
 
+  public void setTableModel(final boolean tableModel) {
+    this.isTableModel = tableModel;
+  }
+
   @Override
-  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+  public <R, C> R accept(final StatementVisitor<R, C> visitor, final C 
context) {
     return visitor.visitShowTopics(this, context);
   }
 
@@ -66,7 +75,7 @@ public class ShowTopicsStatement extends Statement implements 
IConfigStatement {
   }
 
   @Override
-  public TSStatus checkPermissionBeforeProcess(String userName) {
+  public TSStatus checkPermissionBeforeProcess(final String userName) {
     if (AuthorityChecker.SUPER_USER.equals(userName)) {
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index fe9e1345a17..dae3a5dc97c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -112,6 +112,8 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
   private final ThreadLocal<ConsumerConfig> consumerConfigThreadLocal = new 
ThreadLocal<>();
   private final ThreadLocal<PollTimer> pollTimerThreadLocal = new 
ThreadLocal<>();
 
+  private static final String SQL_DIALECT_TABLE_VALUE = "table";
+
   @Override
   public final TPipeSubscribeResp handle(final TPipeSubscribeReq req) {
     final short reqType = req.getType();
@@ -749,7 +751,9 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
           new TSubscribeReq()
               .setConsumerId(consumerConfig.getConsumerId())
               .setConsumerGroupId(consumerConfig.getConsumerGroupId())
-              .setTopicNames(topicNames);
+              .setTopicNames(topicNames)
+              .setIsTableModel(
+                  Objects.equals(consumerConfig.getSqlDialect(), 
SQL_DIALECT_TABLE_VALUE));
       final TSStatus tsStatus = configNodeClient.createSubscription(req);
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
         LOGGER.warn(
@@ -789,7 +793,9 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
           new TUnsubscribeReq()
               .setConsumerId(consumerConfig.getConsumerId())
               .setConsumerGroupId(consumerConfig.getConsumerGroupId())
-              .setTopicNames(topicNames);
+              .setTopicNames(topicNames)
+              .setIsTableModel(
+                  Objects.equals(consumerConfig.getSqlDialect(), 
SQL_DIALECT_TABLE_VALUE));
       final TSStatus tsStatus = configNodeClient.dropSubscription(req);
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
         LOGGER.warn(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/visibility/VisibilityUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/visibility/VisibilityUtils.java
index 5d44fa1379c..94d7ea92da6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/visibility/VisibilityUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/visibility/VisibilityUtils.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.rpc.subscription.config.TopicConfig;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,4 +131,13 @@ public class VisibilityUtils {
         extractorParameters.getAttribute());
     return Visibility.NONE;
   }
+
+  public static Visibility calculateFromTopicConfig(final TopicConfig config) {
+    final boolean isTreeDialect =
+        config
+            .getStringOrDefault(
+                SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TREE_VALUE)
+            .equals(SystemConstant.SQL_DIALECT_TREE_VALUE);
+    return !isTreeDialect ? Visibility.TABLE_ONLY : Visibility.TREE_ONLY;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java
index 91bbb461223..36935053c92 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.commons.subscription.meta.subscription;
 
+import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
+
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
@@ -33,7 +35,7 @@ import java.util.Set;
 /** SubscriptionMeta is created for show subscription and is not stored in 
meta keeper. */
 public class SubscriptionMeta {
 
-  private String topicName;
+  private TopicMeta topicMeta;
   private String consumerGroupId;
   private Set<String> consumerIds;
 
@@ -41,14 +43,14 @@ public class SubscriptionMeta {
     // Empty constructor
   }
 
-  public SubscriptionMeta(String topicName, String consumerGroupId, 
Set<String> consumerIds) {
-    this.topicName = topicName;
+  public SubscriptionMeta(TopicMeta topicMeta, String consumerGroupId, 
Set<String> consumerIds) {
+    this.topicMeta = topicMeta;
     this.consumerGroupId = consumerGroupId;
     this.consumerIds = consumerIds;
   }
 
-  public String getTopicName() {
-    return topicName;
+  public TopicMeta getTopicMeta() {
+    return topicMeta;
   }
 
   public String getConsumerGroupId() {
@@ -67,7 +69,7 @@ public class SubscriptionMeta {
   }
 
   public void serialize(DataOutputStream outputStream) throws IOException {
-    ReadWriteIOUtils.write(topicName, outputStream);
+    topicMeta.serialize(outputStream);
     ReadWriteIOUtils.write(consumerGroupId, outputStream);
 
     ReadWriteIOUtils.write(consumerIds.size(), outputStream);
@@ -77,7 +79,7 @@ public class SubscriptionMeta {
   }
 
   public void serialize(FileOutputStream outputStream) throws IOException {
-    ReadWriteIOUtils.write(topicName, outputStream);
+    topicMeta.serialize(outputStream);
     ReadWriteIOUtils.write(consumerGroupId, outputStream);
 
     ReadWriteIOUtils.write(consumerIds.size(), outputStream);
@@ -89,7 +91,7 @@ public class SubscriptionMeta {
   public static SubscriptionMeta deserialize(InputStream inputStream) throws 
IOException {
     final SubscriptionMeta subscriptionMeta = new SubscriptionMeta();
 
-    subscriptionMeta.topicName = ReadWriteIOUtils.readString(inputStream);
+    subscriptionMeta.topicMeta = TopicMeta.deserialize(inputStream);
     subscriptionMeta.consumerGroupId = 
ReadWriteIOUtils.readString(inputStream);
     subscriptionMeta.consumerIds = new HashSet<>();
 
@@ -104,7 +106,7 @@ public class SubscriptionMeta {
   public static SubscriptionMeta deserialize(ByteBuffer byteBuffer) {
     final SubscriptionMeta subscriptionMeta = new SubscriptionMeta();
 
-    subscriptionMeta.topicName = ReadWriteIOUtils.readString(byteBuffer);
+    subscriptionMeta.topicMeta = TopicMeta.deserialize(byteBuffer);
     subscriptionMeta.consumerGroupId = ReadWriteIOUtils.readString(byteBuffer);
     subscriptionMeta.consumerIds = new HashSet<>();
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index 3a092d9f80f..95ba6fd85c1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.commons.subscription.meta.topic;
 
 import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
+import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility;
+import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.rpc.subscription.config.TopicConfig;
 
@@ -227,6 +229,13 @@ public class TopicMeta {
     return connectorAttributes;
   }
 
+  /////////////////////////////////  Tree & Table Isolation  
/////////////////////////////////
+
+  public boolean visibleUnder(final boolean isTableModel) {
+    final Visibility visibility = 
VisibilityUtils.calculateFromTopicConfig(config);
+    return VisibilityUtils.isCompatible(visibility, isTableModel);
+  }
+
   ////////////////////////////////////// Object 
////////////////////////////////
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMetaKeeper.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMetaKeeper.java
index 42341da441d..741f7c38058 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMetaKeeper.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMetaKeeper.java
@@ -80,6 +80,14 @@ public class TopicMetaKeeper {
     return topicNameToTopicMetaMap.containsKey(topicName);
   }
 
+  public boolean containsTopicMeta(String topicName, boolean isTableModel) {
+    TopicMeta topicMeta = topicNameToTopicMetaMap.get(topicName);
+    if (Objects.isNull(topicMeta)) {
+      return false;
+    }
+    return topicMeta.visibleUnder(isTableModel);
+  }
+
   public void clear() {
     this.topicNameToTopicMetaMap.clear();
   }
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 78abd9b6b61..5bff9167763 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -888,10 +888,12 @@ struct TCreateTopicReq {
 struct TDropTopicReq {
     1: required string topicName
     2: optional bool ifExistsCondition
+    3: optional bool isTableModel
 }
 
 struct TShowTopicReq {
     1: optional string topicName
+    2: optional bool isTableModel
 }
 
 struct TShowTopicResp {
@@ -933,16 +935,19 @@ struct TSubscribeReq {
     1: required string consumerId
     2: required string consumerGroupId
     3: required set<string> topicNames
+    4: optional bool isTableModel
 }
 
 struct TUnsubscribeReq {
     1: required string consumerId
     2: required string consumerGroupId
     3: required set<string> topicNames
+    4: optional bool isTableModel
 }
 
 struct TShowSubscriptionReq {
     1: optional string topicName
+    2: optional bool isTableModel
 }
 
 struct TShowSubscriptionResp {

Reply via email to