This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 29a68365b86 Subscription: tree/table model isolation for
topic/consumer/subscription entities (#15484)
29a68365b86 is described below
commit 29a68365b8612613eeb222746cb1d5b9061e9a27
Author: VGalaxies <[email protected]>
AuthorDate: Mon May 12 17:10:18 2025 +0800
Subscription: tree/table model isolation for topic/consumer/subscription
entities (#15484)
---
.../tablemodel/IoTDBSubscriptionIsolationIT.java | 218 +++++++++++++++++++++
.../rpc/subscription/config/ConsumerConfig.java | 4 +
.../rpc/subscription/config/ConsumerConstant.java | 3 +
.../base/AbstractSubscriptionProvider.java | 1 +
.../subscription/SubscriptionTableResp.java | 29 +--
.../response/subscription/TopicTableResp.java | 25 ++-
.../subscription/SubscriptionCoordinator.java | 29 ++-
.../persistence/subscription/SubscriptionInfo.java | 42 +++-
.../subscription/CreateSubscriptionProcedure.java | 10 -
.../InformationSchemaContentSupplierFactory.java | 8 +-
.../config/executor/ClusterConfigTaskExecutor.java | 5 +-
.../config/sys/subscription/DropTopicTask.java | 1 +
.../sys/subscription/ShowSubscriptionsTask.java | 1 +
.../config/sys/subscription/ShowTopicsTask.java | 1 +
.../metadata/subscription/DropTopicStatement.java | 17 +-
.../subscription/ShowSubscriptionsStatement.java | 15 +-
.../metadata/subscription/ShowTopicsStatement.java | 15 +-
.../receiver/SubscriptionReceiverV1.java | 10 +-
.../datastructure/visibility/VisibilityUtils.java | 10 +
.../meta/subscription/SubscriptionMeta.java | 20 +-
.../commons/subscription/meta/topic/TopicMeta.java | 9 +
.../subscription/meta/topic/TopicMetaKeeper.java | 8 +
.../src/main/thrift/confignode.thrift | 5 +
23 files changed, 399 insertions(+), 87 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/tablemodel/IoTDBSubscriptionIsolationIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/tablemodel/IoTDBSubscriptionIsolationIT.java
new file mode 100644
index 00000000000..7a53bf08744
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/tablemodel/IoTDBSubscriptionIsolationIT.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.local.tablemodel;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.session.subscription.ISubscriptionTableSession;
+import org.apache.iotdb.session.subscription.ISubscriptionTreeSession;
+import org.apache.iotdb.session.subscription.SubscriptionTableSessionBuilder;
+import org.apache.iotdb.session.subscription.SubscriptionTreeSessionBuilder;
+import
org.apache.iotdb.session.subscription.consumer.ISubscriptionTablePullConsumer;
+import
org.apache.iotdb.session.subscription.consumer.ISubscriptionTreePullConsumer;
+import
org.apache.iotdb.session.subscription.consumer.table.SubscriptionTablePullConsumerBuilder;
+import
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumerBuilder;
+import org.apache.iotdb.subscription.it.local.AbstractSubscriptionLocalIT;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class})
+public class IoTDBSubscriptionIsolationIT extends AbstractSubscriptionLocalIT {
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Test
+ public void testTopicIsolation() throws Exception {
+ final String treeTopicName = "treeTopic";
+ final String tableTopicName = "tableTopic";
+
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+ // create tree topic
+ try (final ISubscriptionTreeSession session =
+ new SubscriptionTreeSessionBuilder().host(host).port(port).build()) {
+ session.open();
+ session.createTopic(treeTopicName);
+ }
+
+ // create table topic
+ try (final ISubscriptionTableSession session =
+ new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
+ session.createTopic(tableTopicName);
+ }
+
+ // show topic on tree session
+ try (final ISubscriptionTreeSession session =
+ new SubscriptionTreeSessionBuilder().host(host).port(port).build()) {
+ session.open();
+ Assert.assertEquals(1, session.getTopics().size());
+ Assert.assertTrue(session.getTopic(treeTopicName).isPresent());
+ Assert.assertFalse(session.getTopic(tableTopicName).isPresent());
+ }
+
+ // show topic on table session
+ try (final ISubscriptionTableSession session =
+ new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
+ Assert.assertEquals(1, session.getTopics().size());
+ Assert.assertTrue(session.getTopic(tableTopicName).isPresent());
+ Assert.assertFalse(session.getTopic(treeTopicName).isPresent());
+ }
+
+ // drop table topic on tree session
+ try (final ISubscriptionTreeSession session =
+ new SubscriptionTreeSessionBuilder().host(host).port(port).build()) {
+ session.open();
+ try {
+ session.dropTopic(tableTopicName);
+ fail();
+ } catch (final Exception ignored) {
+ }
+ }
+
+ // drop tree topic on table session
+ try (final ISubscriptionTableSession session =
+ new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
+ try {
+ session.dropTopic(treeTopicName);
+ fail();
+ } catch (final Exception ignored) {
+ }
+ }
+
+ // drop tree topic on tree session
+ try (final ISubscriptionTreeSession session =
+ new SubscriptionTreeSessionBuilder().host(host).port(port).build()) {
+ session.open();
+ session.dropTopic(treeTopicName);
+ }
+
+ // drop table topic on table session
+ try (final ISubscriptionTableSession session =
+ new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
+ session.dropTopic(tableTopicName);
+ }
+ }
+
+ @Test
+ public void testSubscriptionIsolation() throws Exception {
+ final String treeTopicName = "treeTopic";
+ final String tableTopicName = "tableTopic";
+
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+ // create tree topic
+ try (final ISubscriptionTreeSession session =
+ new SubscriptionTreeSessionBuilder().host(host).port(port).build()) {
+ session.open();
+ session.createTopic(treeTopicName);
+ }
+
+ // create table topic
+ try (final ISubscriptionTableSession session =
+ new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
+ session.createTopic(tableTopicName);
+ }
+
+ // subscribe table topic on tree consumer
+ try (final ISubscriptionTreePullConsumer consumer =
+ new
SubscriptionTreePullConsumerBuilder().host(host).port(port).build()) {
+ consumer.open();
+ try {
+ consumer.subscribe(tableTopicName);
+ fail();
+ } catch (final Exception ignored) {
+ }
+ }
+
+ // subscribe tree topic on table consumer
+ try (final ISubscriptionTablePullConsumer consumer =
+ new
SubscriptionTablePullConsumerBuilder().host(host).port(port).build()) {
+ consumer.open();
+ try {
+ consumer.subscribe(treeTopicName);
+ fail();
+ } catch (final Exception ignored) {
+ }
+ }
+
+ // subscribe tree topic on tree consumer
+ final ISubscriptionTreePullConsumer treeConsumer =
+ new
SubscriptionTreePullConsumerBuilder().host(host).port(port).build();
+ treeConsumer.open();
+ treeConsumer.subscribe(treeTopicName);
+
+ // subscribe table topic on table consumer
+ final ISubscriptionTablePullConsumer tableConsumer =
+ new
SubscriptionTablePullConsumerBuilder().host(host).port(port).build();
+ tableConsumer.open();
+ tableConsumer.subscribe(tableTopicName);
+
+ // show subscription on tree session
+ try (final ISubscriptionTreeSession session =
+ new SubscriptionTreeSessionBuilder().host(host).port(port).build()) {
+ session.open();
+ Assert.assertEquals(1, session.getSubscriptions().size());
+ Assert.assertEquals(1, session.getSubscriptions(treeTopicName).size());
+ Assert.assertEquals(0, session.getSubscriptions(tableTopicName).size());
+ }
+
+ // show subscription on table session
+ try (final ISubscriptionTableSession session =
+ new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
+ Assert.assertEquals(1, session.getSubscriptions().size());
+ Assert.assertEquals(1, session.getSubscriptions(tableTopicName).size());
+ Assert.assertEquals(0, session.getSubscriptions(treeTopicName).size());
+ }
+
+ // unsubscribe table topic on tree consumer
+ try {
+ treeConsumer.unsubscribe(tableTopicName);
+ fail();
+ } catch (final Exception ignored) {
+
+ }
+
+ // unsubscribe tree topic on table consumer
+ try {
+ tableConsumer.unsubscribe(treeTopicName);
+ fail();
+ } catch (final Exception ignored) {
+
+ }
+
+ // close consumers
+ treeConsumer.close();
+ tableConsumer.close();
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
index 0ae2c5bbdc8..100de1f1126 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
@@ -76,6 +76,10 @@ public class ConsumerConfig extends PipeParameters {
return getString(ConsumerConstant.PASSWORD_KEY);
}
+ public String getSqlDialect() {
+ return getString(ConsumerConstant.SQL_DIALECT_KEY);
+ }
+
public void setConsumerId(final String consumerId) {
attributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
index 5ad1d9ba435..dd0c583e40d 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
@@ -25,6 +25,9 @@ public class ConsumerConstant {
/////////////////////////////// common ///////////////////////////////
+ // TODO: hide from the client
+ public static final String SQL_DIALECT_KEY = "sql-dialect";
+
public static final String HOST_KEY = "host";
public static final String PORT_KEY = "port";
public static final String NODE_URLS_KEY = "node-urls";
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
index 781dacb9738..df5dfcfd7f7 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
@@ -163,6 +163,7 @@ public abstract class AbstractSubscriptionProvider {
consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username);
consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password);
+ consumerAttributes.put(ConsumerConstant.SQL_DIALECT_KEY,
session.getSqlDialect());
final PipeSubscribeHandshakeResp resp =
handshake(new ConsumerConfig(consumerAttributes)); // throw
SubscriptionException
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java
index d6679ea4c11..c242dbf2b08 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java
@@ -31,6 +31,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
public class SubscriptionTableResp implements DataSet {
private final TSStatus status;
@@ -46,19 +48,18 @@ public class SubscriptionTableResp implements DataSet {
this.allConsumerGroupMeta = allConsumerGroupMeta;
}
- public SubscriptionTableResp filter(String topicName) {
- if (topicName == null) {
- return this;
- } else {
- final List<SubscriptionMeta> filteredSubscriptionMeta = new
ArrayList<>();
- for (SubscriptionMeta subscriptionMeta : allSubscriptionMeta) {
- if (subscriptionMeta.getTopicName().equals(topicName)) {
- filteredSubscriptionMeta.add(subscriptionMeta);
- break;
- }
- }
- return new SubscriptionTableResp(status, filteredSubscriptionMeta,
allConsumerGroupMeta);
- }
+ public SubscriptionTableResp filter(String topicName, boolean isTableModel) {
+ return new SubscriptionTableResp(
+ status,
+ allSubscriptionMeta.stream()
+ .filter(
+ subscriptionMeta ->
+ (Objects.isNull(topicName)
+ || Objects.equals(
+
subscriptionMeta.getTopicMeta().getTopicName(), topicName))
+ &&
subscriptionMeta.getTopicMeta().visibleUnder(isTableModel))
+ .collect(Collectors.toList()),
+ allConsumerGroupMeta);
}
public TShowSubscriptionResp convertToTShowSubscriptionResp() {
@@ -67,7 +68,7 @@ public class SubscriptionTableResp implements DataSet {
for (SubscriptionMeta subscriptionMeta : allSubscriptionMeta) {
showSubscriptionInfoList.add(
new TShowSubscriptionInfo(
- subscriptionMeta.getTopicName(),
+ subscriptionMeta.getTopicMeta().getTopicName(),
subscriptionMeta.getConsumerGroupId(),
subscriptionMeta.getConsumerIds()));
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/TopicTableResp.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/TopicTableResp.java
index 672a519243c..2c2e6f94bb3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/TopicTableResp.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/TopicTableResp.java
@@ -30,6 +30,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
public class TopicTableResp implements DataSet {
private final TSStatus status;
@@ -40,19 +42,16 @@ public class TopicTableResp implements DataSet {
this.allTopicMeta = allTopicMeta;
}
- public TopicTableResp filter(String topicName) {
- if (topicName == null) {
- return this;
- } else {
- final List<TopicMeta> filteredTopicMeta = new ArrayList<>();
- for (TopicMeta topicMeta : allTopicMeta) {
- if (topicMeta.getTopicName().equals(topicName)) {
- filteredTopicMeta.add(topicMeta);
- break;
- }
- }
- return new TopicTableResp(status, filteredTopicMeta);
- }
+ public TopicTableResp filter(String topicName, boolean isTableModel) {
+ return new TopicTableResp(
+ status,
+ allTopicMeta.stream()
+ .filter(
+ topicMeta ->
+ (Objects.isNull(topicName)
+ || Objects.equals(topicMeta.getTopicName(),
topicName))
+ && topicMeta.visibleUnder(isTableModel))
+ .collect(Collectors.toList()));
}
public TShowTopicResp convertToTShowTopicResp() {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index 20dfce44bf2..194ec1f31f9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -153,28 +153,23 @@ public class SubscriptionCoordinator {
public TSStatus dropTopic(TDropTopicReq req) {
final String topicName = req.getTopicName();
- final boolean isTopicExistedBeforeDrop =
subscriptionInfo.isTopicExisted(topicName);
- final TSStatus status =
configManager.getProcedureManager().dropTopic(topicName);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.warn("Failed to drop topic {}. Result status: {}.", topicName,
status);
- }
-
- // If the `IF EXISTS` condition is not set and the topic does not exist
before the drop
- // operation, return an error status indicating that the topic does not
exist.
- final boolean isIfExistedConditionSet =
+ final boolean isSetIfExistsCondition =
req.isSetIfExistsCondition() && req.isIfExistsCondition();
- return isTopicExistedBeforeDrop || isIfExistedConditionSet
- ? status
- : RpcUtils.getStatus(
- TSStatusCode.TOPIC_NOT_EXIST_ERROR,
- String.format(
- "Failed to drop topic %s. Failures: %s does not exist.",
topicName, topicName));
+ if (!subscriptionInfo.isTopicExisted(topicName, req.isTableModel)) {
+ return isSetIfExistsCondition
+ ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
+ : RpcUtils.getStatus(
+ TSStatusCode.TOPIC_NOT_EXIST_ERROR,
+ String.format(
+ "Failed to drop topic %s. Failures: %s does not exist.",
topicName, topicName));
+ }
+ return configManager.getProcedureManager().dropTopic(topicName);
}
public TShowTopicResp showTopic(TShowTopicReq req) {
try {
return ((TopicTableResp) configManager.getConsensusManager().read(new
ShowTopicPlan()))
- .filter(req.getTopicName())
+ .filter(req.topicName, req.isTableModel)
.convertToTShowTopicResp();
} catch (Exception e) {
LOGGER.warn("Failed to show topic info.", e);
@@ -252,7 +247,7 @@ public class SubscriptionCoordinator {
try {
return ((SubscriptionTableResp)
configManager.getConsensusManager().read(new
ShowSubscriptionPlan()))
- .filter(req.getTopicName())
+ .filter(req.getTopicName(), req.isTableModel)
.convertToTShowSubscriptionResp();
} catch (Exception e) {
LOGGER.warn("Failed to show subscription info.", e);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
index 2ccde563866..2329c05a30b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
@@ -268,6 +268,15 @@ public class SubscriptionInfo implements SnapshotProcessor
{
}
}
+ public boolean isTopicExisted(String topicName, boolean isTableModel) {
+ acquireReadLock();
+ try {
+ return topicMetaKeeper.containsTopicMeta(topicName, isTableModel);
+ } finally {
+ releaseReadLock();
+ }
+ }
+
public TopicMeta getTopicMeta(String topicName) {
acquireReadLock();
try {
@@ -567,23 +576,39 @@ public class SubscriptionInfo implements
SnapshotProcessor {
private void checkBeforeSubscribeInternal(TSubscribeReq subscribeReq)
throws SubscriptionException {
+ String consumerId = subscribeReq.getConsumerId();
+ String consumerGroupId = subscribeReq.getConsumerGroupId();
+
// 1. Check if the consumer exists
- if (!isConsumerExisted(subscribeReq.getConsumerGroupId(),
subscribeReq.getConsumerId())) {
+ if (!isConsumerExisted(consumerGroupId, consumerId)) {
// There is no consumer with the same consumerId and consumerGroupId,
// we should end the procedure
final String exceptionMessage =
String.format(
"Failed to subscribe because the consumer %s in consumer group
%s does not exist",
- subscribeReq.getConsumerId(), subscribeReq.getConsumerGroupId());
+ consumerId, consumerGroupId);
LOGGER.warn(exceptionMessage);
throw new SubscriptionException(exceptionMessage);
}
- // 2. Check if all topics exist. No need to check if already subscribed.
- for (String topic : subscribeReq.getTopicNames()) {
- if (!isTopicExisted(topic)) {
+ ConsumerGroupMeta consumerGroupMeta =
getConsumerGroupMeta(consumerGroupId);
+
+ // 2. Check all topics will be subscribed
+ for (String topicName : subscribeReq.getTopicNames()) {
+ // 2.1. check if exist
+ if (!isTopicExisted(topicName, subscribeReq.isTableModel)) {
+ final String exceptionMessage =
+ String.format("Failed to subscribe because the topic %s does not
exist", topicName);
+ LOGGER.warn(exceptionMessage);
+ throw new SubscriptionException(exceptionMessage);
+ }
+
+ // 2.2. check username
+ if (!consumerGroupMeta.allowSubscribeTopicForConsumer(topicName,
consumerId)) {
final String exceptionMessage =
- String.format("Failed to subscribe because the topic %s does not
exist", topic);
+ String.format(
+ "Failed to subscribe topic %s for consumer %s because
inconsistent username under the same consumer group",
+ topicName, consumerId);
LOGGER.warn(exceptionMessage);
throw new SubscriptionException(exceptionMessage);
}
@@ -616,7 +641,7 @@ public class SubscriptionInfo implements SnapshotProcessor {
// 2. Check if all topics exist. No need to check if already subscribed.
for (String topic : unsubscribeReq.getTopicNames()) {
- if (!isTopicExisted(topic)) {
+ if (!isTopicExisted(topic, unsubscribeReq.isTableModel)) {
final String exceptionMessage =
String.format("Failed to unsubscribe because the topic %s does not
exist", topic);
LOGGER.warn(exceptionMessage);
@@ -647,8 +672,7 @@ public class SubscriptionInfo implements SnapshotProcessor {
consumerGroupId, topicMeta.getTopicName());
if (!subscribedConsumerIDs.isEmpty()) {
allSubscriptions.add(
- new SubscriptionMeta(
- topicMeta.getTopicName(), consumerGroupId,
subscribedConsumerIDs));
+ new SubscriptionMeta(topicMeta, consumerGroupId,
subscribedConsumerIDs));
}
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
index 31b852b38bb..cb5edd8cd91 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
@@ -107,16 +107,6 @@ public class CreateSubscriptionProcedure extends
AbstractOperateSubscriptionAndP
for (final String topicName : subscribeReq.getTopicNames()) {
final String pipeName =
PipeStaticMeta.generateSubscriptionPipeName(topicName,
consumerGroupId);
- // check username
- if (!consumerGroupMeta.allowSubscribeTopicForConsumer(topicName,
consumerId)) {
- final String exceptionMessage =
- String.format(
- "Failed to subscribe topic %s for consumer %s because
inconsistent username under the same consumer group",
- topicName, consumerId);
- LOGGER.warn(exceptionMessage);
- throw new SubscriptionException(exceptionMessage);
- }
-
if (!subscriptionInfo.get().isTopicSubscribedByConsumerGroup(topicName,
consumerGroupId)
// even if there existed subscription meta, if there is no
corresponding pipe meta, it
// will try to create the pipe
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
index d8e680b432c..7699da64050 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
@@ -575,7 +575,11 @@ public class InformationSchemaContentSupplierFactory {
super(dataTypes);
try (final ConfigNodeClient client =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
- iterator = client.showTopic(new
TShowTopicReq()).getTopicInfoList().iterator();
+ iterator =
+ client
+ .showTopic(new TShowTopicReq().setIsTableModel(true))
+ .getTopicInfoList()
+ .iterator();
} catch (final Exception e) {
lastException = e;
}
@@ -606,7 +610,7 @@ public class InformationSchemaContentSupplierFactory {
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
iterator =
client
- .showSubscription(new TShowSubscriptionReq())
+ .showSubscription(new
TShowSubscriptionReq().setIsTableModel(true))
.getSubscriptionInfoList()
.iterator();
} catch (final Exception e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 23b065b88c9..a4e36bc3837 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2357,6 +2357,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
if (showSubscriptionsStatement.getTopicName() != null) {
showSubscriptionReq.setTopicName(showSubscriptionsStatement.getTopicName());
}
+
showSubscriptionReq.setIsTableModel(showSubscriptionsStatement.isTableModel());
final TShowSubscriptionResp showSubscriptionResp =
configNodeClient.showSubscription(showSubscriptionReq);
@@ -2456,7 +2457,8 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
configNodeClient.dropTopicExtended(
new TDropTopicReq()
.setIfExistsCondition(dropTopicStatement.hasIfExistsCondition())
- .setTopicName(dropTopicStatement.getTopicName()));
+ .setTopicName(dropTopicStatement.getTopicName())
+ .setIsTableModel(dropTopicStatement.isTableModel()));
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
future.setException(new IoTDBException(tsStatus.message,
tsStatus.code));
} else {
@@ -2478,6 +2480,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
if (showTopicsStatement.getTopicName() != null) {
showTopicReq.setTopicName(showTopicsStatement.getTopicName());
}
+ showTopicReq.setIsTableModel(showTopicsStatement.isTableModel());
final TShowTopicResp showTopicResp =
configNodeClient.showTopic(showTopicReq);
if (showTopicResp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropTopicTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropTopicTask.java
index fe28c387bf7..d7ff17d2873 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropTopicTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropTopicTask.java
@@ -39,6 +39,7 @@ public class DropTopicTask implements IConfigTask {
this.dropTopicStatement = new DropTopicStatement();
this.dropTopicStatement.setTopicName(dropTopic.getTopicName());
this.dropTopicStatement.setIfExists(dropTopic.hasIfExistsCondition());
+ this.dropTopicStatement.setTableModel(true);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
index b421a25e3ce..c59a5d7b902 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
@@ -50,6 +50,7 @@ public class ShowSubscriptionsTask implements IConfigTask {
public ShowSubscriptionsTask(final ShowSubscriptions showSubscriptions) {
this.showSubscriptionsStatement = new ShowSubscriptionsStatement();
this.showSubscriptionsStatement.setTopicName(showSubscriptions.getTopicName());
+ this.showSubscriptionsStatement.setTableModel(true);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowTopicsTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowTopicsTask.java
index 55578168d19..1871ef66a50 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowTopicsTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowTopicsTask.java
@@ -50,6 +50,7 @@ public class ShowTopicsTask implements IConfigTask {
public ShowTopicsTask(final ShowTopics showTopics) {
this.showTopicsStatement = new ShowTopicsStatement();
this.showTopicsStatement.setTopicName(showTopics.getTopicName());
+ this.showTopicsStatement.setTableModel(true);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
index 97d173b9fdd..3bc5d866bc4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
@@ -37,6 +37,7 @@ public class DropTopicStatement extends Statement implements
IConfigStatement {
private String topicName;
private boolean ifExistsCondition;
+ private boolean isTableModel;
public DropTopicStatement() {
super();
@@ -51,14 +52,22 @@ public class DropTopicStatement extends Statement
implements IConfigStatement {
return ifExistsCondition;
}
- public void setTopicName(String topicName) {
+ public boolean isTableModel() {
+ return isTableModel;
+ }
+
+ public void setTopicName(final String topicName) {
this.topicName = topicName;
}
- public void setIfExists(boolean ifExistsCondition) {
+ public void setIfExists(final boolean ifExistsCondition) {
this.ifExistsCondition = ifExistsCondition;
}
+ public void setTableModel(final boolean tableModel) {
+ this.isTableModel = tableModel;
+ }
+
@Override
public QueryType getQueryType() {
return QueryType.WRITE;
@@ -70,12 +79,12 @@ public class DropTopicStatement extends Statement
implements IConfigStatement {
}
@Override
- public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ public <R, C> R accept(final StatementVisitor<R, C> visitor, final C
context) {
return visitor.visitDropTopic(this, context);
}
@Override
- public TSStatus checkPermissionBeforeProcess(String userName) {
+ public TSStatus checkPermissionBeforeProcess(final String userName) {
if (AuthorityChecker.SUPER_USER.equals(userName)) {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/ShowSubscriptionsStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/ShowSubscriptionsStatement.java
index ff7267f9881..9bccb3270ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/ShowSubscriptionsStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/ShowSubscriptionsStatement.java
@@ -36,6 +36,7 @@ import java.util.List;
public class ShowSubscriptionsStatement extends Statement implements
IConfigStatement {
private String topicName;
+ private boolean isTableModel;
public ShowSubscriptionsStatement() {
super();
@@ -46,12 +47,20 @@ public class ShowSubscriptionsStatement extends Statement
implements IConfigStat
return topicName;
}
- public void setTopicName(String topicName) {
+ public boolean isTableModel() {
+ return isTableModel;
+ }
+
+ public void setTopicName(final String topicName) {
this.topicName = topicName;
}
+ public void setTableModel(final boolean tableModel) {
+ this.isTableModel = tableModel;
+ }
+
@Override
- public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ public <R, C> R accept(final StatementVisitor<R, C> visitor, final C
context) {
return visitor.visitShowSubscriptions(this, context);
}
@@ -66,7 +75,7 @@ public class ShowSubscriptionsStatement extends Statement
implements IConfigStat
}
@Override
- public TSStatus checkPermissionBeforeProcess(String userName) {
+ public TSStatus checkPermissionBeforeProcess(final String userName) {
if (AuthorityChecker.SUPER_USER.equals(userName)) {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/ShowTopicsStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/ShowTopicsStatement.java
index 82a4a87dc93..eee05092407 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/ShowTopicsStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/ShowTopicsStatement.java
@@ -36,6 +36,7 @@ import java.util.List;
public class ShowTopicsStatement extends Statement implements IConfigStatement
{
private String topicName;
+ private boolean isTableModel;
public ShowTopicsStatement() {
super();
@@ -46,12 +47,20 @@ public class ShowTopicsStatement extends Statement
implements IConfigStatement {
return topicName;
}
- public void setTopicName(String topicName) {
+ public boolean isTableModel() {
+ return isTableModel;
+ }
+
+ public void setTopicName(final String topicName) {
this.topicName = topicName;
}
+ public void setTableModel(final boolean tableModel) {
+ this.isTableModel = tableModel;
+ }
+
@Override
- public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ public <R, C> R accept(final StatementVisitor<R, C> visitor, final C
context) {
return visitor.visitShowTopics(this, context);
}
@@ -66,7 +75,7 @@ public class ShowTopicsStatement extends Statement implements
IConfigStatement {
}
@Override
- public TSStatus checkPermissionBeforeProcess(String userName) {
+ public TSStatus checkPermissionBeforeProcess(final String userName) {
if (AuthorityChecker.SUPER_USER.equals(userName)) {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index fe9e1345a17..dae3a5dc97c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -112,6 +112,8 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
private final ThreadLocal<ConsumerConfig> consumerConfigThreadLocal = new
ThreadLocal<>();
private final ThreadLocal<PollTimer> pollTimerThreadLocal = new
ThreadLocal<>();
+ private static final String SQL_DIALECT_TABLE_VALUE = "table";
+
@Override
public final TPipeSubscribeResp handle(final TPipeSubscribeReq req) {
final short reqType = req.getType();
@@ -749,7 +751,9 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
new TSubscribeReq()
.setConsumerId(consumerConfig.getConsumerId())
.setConsumerGroupId(consumerConfig.getConsumerGroupId())
- .setTopicNames(topicNames);
+ .setTopicNames(topicNames)
+ .setIsTableModel(
+ Objects.equals(consumerConfig.getSqlDialect(),
SQL_DIALECT_TABLE_VALUE));
final TSStatus tsStatus = configNodeClient.createSubscription(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
@@ -789,7 +793,9 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
new TUnsubscribeReq()
.setConsumerId(consumerConfig.getConsumerId())
.setConsumerGroupId(consumerConfig.getConsumerGroupId())
- .setTopicNames(topicNames);
+ .setTopicNames(topicNames)
+ .setIsTableModel(
+ Objects.equals(consumerConfig.getSqlDialect(),
SQL_DIALECT_TABLE_VALUE));
final TSStatus tsStatus = configNodeClient.dropSubscription(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/visibility/VisibilityUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/visibility/VisibilityUtils.java
index 5d44fa1379c..94d7ea92da6 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/visibility/VisibilityUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/visibility/VisibilityUtils.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.pipe.api.annotation.TableModel;
import org.apache.iotdb.pipe.api.annotation.TreeModel;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.rpc.subscription.config.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,4 +131,13 @@ public class VisibilityUtils {
extractorParameters.getAttribute());
return Visibility.NONE;
}
+
+ public static Visibility calculateFromTopicConfig(final TopicConfig config) {
+ final boolean isTreeDialect =
+ config
+ .getStringOrDefault(
+ SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TREE_VALUE)
+ .equals(SystemConstant.SQL_DIALECT_TREE_VALUE);
+ return !isTreeDialect ? Visibility.TABLE_ONLY : Visibility.TREE_ONLY;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java
index 91bbb461223..36935053c92 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.commons.subscription.meta.subscription;
+import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
+
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -33,7 +35,7 @@ import java.util.Set;
/** SubscriptionMeta is created for show subscription and is not stored in
meta keeper. */
public class SubscriptionMeta {
- private String topicName;
+ private TopicMeta topicMeta;
private String consumerGroupId;
private Set<String> consumerIds;
@@ -41,14 +43,14 @@ public class SubscriptionMeta {
// Empty constructor
}
- public SubscriptionMeta(String topicName, String consumerGroupId,
Set<String> consumerIds) {
- this.topicName = topicName;
+ public SubscriptionMeta(TopicMeta topicMeta, String consumerGroupId,
Set<String> consumerIds) {
+ this.topicMeta = topicMeta;
this.consumerGroupId = consumerGroupId;
this.consumerIds = consumerIds;
}
- public String getTopicName() {
- return topicName;
+ public TopicMeta getTopicMeta() {
+ return topicMeta;
}
public String getConsumerGroupId() {
@@ -67,7 +69,7 @@ public class SubscriptionMeta {
}
public void serialize(DataOutputStream outputStream) throws IOException {
- ReadWriteIOUtils.write(topicName, outputStream);
+ topicMeta.serialize(outputStream);
ReadWriteIOUtils.write(consumerGroupId, outputStream);
ReadWriteIOUtils.write(consumerIds.size(), outputStream);
@@ -77,7 +79,7 @@ public class SubscriptionMeta {
}
public void serialize(FileOutputStream outputStream) throws IOException {
- ReadWriteIOUtils.write(topicName, outputStream);
+ topicMeta.serialize(outputStream);
ReadWriteIOUtils.write(consumerGroupId, outputStream);
ReadWriteIOUtils.write(consumerIds.size(), outputStream);
@@ -89,7 +91,7 @@ public class SubscriptionMeta {
public static SubscriptionMeta deserialize(InputStream inputStream) throws
IOException {
final SubscriptionMeta subscriptionMeta = new SubscriptionMeta();
- subscriptionMeta.topicName = ReadWriteIOUtils.readString(inputStream);
+ subscriptionMeta.topicMeta = TopicMeta.deserialize(inputStream);
subscriptionMeta.consumerGroupId =
ReadWriteIOUtils.readString(inputStream);
subscriptionMeta.consumerIds = new HashSet<>();
@@ -104,7 +106,7 @@ public class SubscriptionMeta {
public static SubscriptionMeta deserialize(ByteBuffer byteBuffer) {
final SubscriptionMeta subscriptionMeta = new SubscriptionMeta();
- subscriptionMeta.topicName = ReadWriteIOUtils.readString(byteBuffer);
+ subscriptionMeta.topicMeta = TopicMeta.deserialize(byteBuffer);
subscriptionMeta.consumerGroupId = ReadWriteIOUtils.readString(byteBuffer);
subscriptionMeta.consumerIds = new HashSet<>();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index 3a092d9f80f..95ba6fd85c1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.commons.subscription.meta.topic;
import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
+import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility;
+import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.rpc.subscription.config.TopicConfig;
@@ -227,6 +229,13 @@ public class TopicMeta {
return connectorAttributes;
}
+ ///////////////////////////////// Tree & Table Isolation
/////////////////////////////////
+
+ public boolean visibleUnder(final boolean isTableModel) {
+ final Visibility visibility =
VisibilityUtils.calculateFromTopicConfig(config);
+ return VisibilityUtils.isCompatible(visibility, isTableModel);
+ }
+
////////////////////////////////////// Object
////////////////////////////////
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMetaKeeper.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMetaKeeper.java
index 42341da441d..741f7c38058 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMetaKeeper.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMetaKeeper.java
@@ -80,6 +80,14 @@ public class TopicMetaKeeper {
return topicNameToTopicMetaMap.containsKey(topicName);
}
+ public boolean containsTopicMeta(String topicName, boolean isTableModel) {
+ TopicMeta topicMeta = topicNameToTopicMetaMap.get(topicName);
+ if (Objects.isNull(topicMeta)) {
+ return false;
+ }
+ return topicMeta.visibleUnder(isTableModel);
+ }
+
public void clear() {
this.topicNameToTopicMetaMap.clear();
}
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 78abd9b6b61..5bff9167763 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -888,10 +888,12 @@ struct TCreateTopicReq {
struct TDropTopicReq {
1: required string topicName
2: optional bool ifExistsCondition
+ 3: optional bool isTableModel
}
struct TShowTopicReq {
1: optional string topicName
+ 2: optional bool isTableModel
}
struct TShowTopicResp {
@@ -933,16 +935,19 @@ struct TSubscribeReq {
1: required string consumerId
2: required string consumerGroupId
3: required set<string> topicNames
+ 4: optional bool isTableModel
}
struct TUnsubscribeReq {
1: required string consumerId
2: required string consumerGroupId
3: required set<string> topicNames
+ 4: optional bool isTableModel
}
struct TShowSubscriptionReq {
1: optional string topicName
+ 2: optional bool isTableModel
}
struct TShowSubscriptionResp {