This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch dev/1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push: new fdf847cc19f Subscription: support drop subscription from session & intro allTopicMessagesHaveBeenConsumed for pull consumer (#15486) (#15568) fdf847cc19f is described below commit fdf847cc19f21fa1d0a608e76a570ce6ebc65eb7 Author: VGalaxies <vgalax...@apache.org> AuthorDate: Fri May 23 14:20:10 2025 +0800 Subscription: support drop subscription from session & intro allTopicMessagesHaveBeenConsumed for pull consumer (#15486) (#15568) - Introducing a new DropSubscriptionTask and corresponding updates across executor interfaces, cluster executors, and managers. - Updating the SQL parser to recognize the DROP SUBSCRIPTION statement. - Enhancing client session APIs and internal data models to support subscription deletion. --- .../apache/iotdb/SubscriptionSessionExample.java | 4 +- .../it/dual/IoTDBSubscriptionTopicIT.java | 2 +- .../it/local/IoTDBSubscriptionBasicIT.java | 97 ++++++++++++++++++++++ .../session/subscription/SubscriptionSession.java | 19 ++++- .../consumer/SubscriptionConsumer.java | 5 ++ .../session/subscription/model/Subscription.java | 16 +++- .../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 | 1 + .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 12 +-- .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 4 + .../subscription/SubscriptionTableResp.java | 34 ++++---- .../iotdb/confignode/manager/ConfigManager.java | 9 ++ .../apache/iotdb/confignode/manager/IManager.java | 3 + .../subscription/SubscriptionCoordinator.java | 27 ++++++ .../persistence/subscription/SubscriptionInfo.java | 56 ++++++++++--- .../thrift/ConfigNodeRPCServiceProcessor.java | 6 ++ .../org/apache/iotdb/db/audit/AuditLogger.java | 1 + .../iotdb/db/protocol/client/ConfigNodeClient.java | 7 ++ .../common/header/ColumnHeaderConstant.java | 2 + .../plan/execution/config/ConfigTaskVisitor.java | 22 +++-- .../config/executor/ClusterConfigTaskExecutor.java | 27 +++++- .../config/executor/IConfigTaskExecutor.java | 4 + .../sys/subscription/DropSubscriptionTask.java | 42 ++++++++++ ...riptionTask.java => ShowSubscriptionsTask.java} | 17 +++- .../db/queryengine/plan/parser/ASTVisitor.java | 17 ++++ .../queryengine/plan/statement/StatementType.java | 3 +- .../plan/statement/StatementVisitor.java | 5 ++ .../subscription/DropSubscriptionStatement.java | 86 +++++++++++++++++++ iotdb-core/node-commons/pom.xml | 4 + .../meta/consumer/ConsumerGroupMeta.java | 95 ++++++++++++++++++--- .../meta/consumer/ConsumerGroupMetaKeeper.java | 7 ++ .../meta/subscription/SubscriptionMeta.java | 84 ++++--------------- .../src/main/thrift/confignode.thrift | 11 ++- 32 files changed, 601 insertions(+), 128 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java index 1ec74a34a29..c01797982e0 100644 --- a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java @@ -268,7 +268,7 @@ public class SubscriptionSessionExample { .buildPushConsumer()) { consumer3.open(); consumer3.subscribe(TOPIC_3); - while (!consumer3.allSnapshotTopicMessagesHaveBeenConsumed()) { + while (!consumer3.allTopicMessagesHaveBeenConsumed()) { LockSupport.parkNanos(SLEEP_NS); // wait some time } } @@ -309,7 +309,7 @@ public class SubscriptionSessionExample { .buildPullConsumer()) { consumer4.open(); consumer4.subscribe(TOPIC_4); - while (!consumer4.allSnapshotTopicMessagesHaveBeenConsumed()) { + while (!consumer4.allTopicMessagesHaveBeenConsumed()) { for (final SubscriptionMessage message : consumer4.poll(POLL_TIMEOUT_MS)) { final SubscriptionTsFileHandler handler = message.getTsFileHandler(); handler.moveFile( diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java index 723090d433e..b046f09a6da 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java @@ -859,7 +859,7 @@ public class IoTDBSubscriptionTopicIT extends AbstractSubscriptionDualIT { consumer.open(); consumer.subscribe(topicName); - while (!consumer.allSnapshotTopicMessagesHaveBeenConsumed()) { + while (!consumer.allTopicMessagesHaveBeenConsumed()) { LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); // poll and ignore } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java index 014b38ec25b..a6750b0ff19 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java @@ -19,10 +19,14 @@ package org.apache.iotdb.subscription.it.local; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp; import org.apache.iotdb.isession.ISession; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.LocalStandaloneIT; +import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.apache.iotdb.session.subscription.SubscriptionSession; import org.apache.iotdb.session.subscription.consumer.AckStrategy; @@ -30,6 +34,7 @@ import org.apache.iotdb.session.subscription.consumer.AsyncCommitCallback; import org.apache.iotdb.session.subscription.consumer.ConsumeResult; import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer; +import org.apache.iotdb.session.subscription.model.Subscription; import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet; import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; @@ -51,6 +56,7 @@ import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -621,4 +627,95 @@ public class IoTDBSubscriptionBasicIT extends AbstractSubscriptionLocalIT { fail(e.getMessage()); } } + + @Test + public void testDropSubscriptionBySession() throws Exception { + // Insert some historical data + try (final ISession session = EnvFactory.getEnv().getSessionConnection()) { + for (int i = 0; i < 100; ++i) { + session.executeNonQueryStatement( + String.format("insert into root.db.d1(time, s1) values (%s, 1)", i)); + } + session.executeNonQueryStatement("flush"); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + // Create topic + final String topicName = "topic8"; + final String host = EnvFactory.getEnv().getIP(); + final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); + try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + session.createTopic(topicName); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + // Subscription + final Thread thread = + new Thread( + () -> { + try (final SubscriptionPullConsumer consumer = + new SubscriptionPullConsumer.Builder() + .host(host) + .port(port) + .consumerId("c1") + .consumerGroupId("cg1") + .autoCommit(true) + .buildPullConsumer()) { + consumer.open(); + consumer.subscribe(topicName); + + while (!consumer.allTopicMessagesHaveBeenConsumed()) { + LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time + consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); // poll and ignore + } + } catch (final Exception e) { + e.printStackTrace(); + // Avoid failure + } finally { + LOGGER.info("consumer exiting..."); + } + }, + String.format("%s - consumer", testName.getDisplayName())); + thread.start(); + + // Drop Subscription + LockSupport.parkNanos(5_000_000_000L); // wait some time + try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + final Set<Subscription> subscriptions = session.getSubscriptions(topicName); + Assert.assertEquals(1, subscriptions.size()); + session.dropSubscription(subscriptions.iterator().next().getSubscriptionId()); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try { + // Keep retrying if there are execution failures + AWAIT.untilAsserted( + () -> { + // Check empty subscription + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) + EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + final TShowSubscriptionResp showSubscriptionResp = + client.showSubscription(new TShowSubscriptionReq()); + Assert.assertEquals( + RpcUtils.SUCCESS_STATUS.getCode(), showSubscriptionResp.status.getCode()); + Assert.assertNotNull(showSubscriptionResp.subscriptionInfoList); + Assert.assertEquals(0, showSubscriptionResp.subscriptionInfoList.size()); + } + }); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + thread.join(); + } + } } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java index 68e2ac0028c..2f2bc40d951 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java @@ -266,6 +266,20 @@ public class SubscriptionSession extends Session { } } + public void dropSubscription(final String subscriptionId) + throws IoTDBConnectionException, StatementExecutionException { + IdentifierUtils.checkAndParseIdentifier(subscriptionId); // ignore the parse result + final String sql = String.format("DROP SUBSCRIPTION %s", subscriptionId); + executeNonQueryStatement(sql); + } + + public void dropSubscriptionIfExists(final String subscriptionId) + throws IoTDBConnectionException, StatementExecutionException { + IdentifierUtils.checkAndParseIdentifier(subscriptionId); // ignore the parse result + final String sql = String.format("DROP SUBSCRIPTION IF EXISTS %s", subscriptionId); + executeNonQueryStatement(sql); + } + /////////////////////////////// utility /////////////////////////////// public Set<Topic> convertDataSetToTopics(final SessionDataSet dataSet) @@ -291,7 +305,7 @@ public class SubscriptionSession extends Session { while (dataSet.hasNext()) { final RowRecord record = dataSet.next(); final List<Field> fields = record.getFields(); - if (fields.size() != 3) { + if (fields.size() != 4) { throw new SubscriptionException( String.format( "Unexpected fields %s was obtained during SHOW SUBSCRIPTION...", @@ -301,7 +315,8 @@ public class SubscriptionSession extends Session { new Subscription( fields.get(0).getStringValue(), fields.get(1).getStringValue(), - fields.get(2).getStringValue())); + fields.get(2).getStringValue(), + fields.get(3).getStringValue())); } return subscriptions; } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java index 8cbe1b23cb3..7add29b4d7c 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java @@ -123,10 +123,15 @@ abstract class SubscriptionConsumer implements AutoCloseable { @SuppressWarnings("java:S3077") protected volatile Map<String, TopicConfig> subscribedTopics = new HashMap<>(); + @Deprecated public boolean allSnapshotTopicMessagesHaveBeenConsumed() { return allTopicMessagesHaveBeenConsumed(subscribedTopics.keySet()); } + public boolean allTopicMessagesHaveBeenConsumed() { + return allTopicMessagesHaveBeenConsumed(subscribedTopics.keySet()); + } + private boolean allTopicMessagesHaveBeenConsumed(final Collection<String> topicNames) { // For the topic that needs to be detected, there are two scenarios to consider: // 1. If configs as live, it cannot be determined whether the topic has been fully consumed. diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Subscription.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Subscription.java index e5f227be17b..01e454cae2c 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Subscription.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Subscription.java @@ -21,16 +21,26 @@ package org.apache.iotdb.session.subscription.model; public class Subscription { + private final String subscriptionId; private final String topicName; private final String consumerGroupId; private final String consumerIds; - public Subscription(String topicName, String consumerGroupId, String consumerIds) { + public Subscription( + final String subscriptionId, + final String topicName, + final String consumerGroupId, + final String consumerIds) { + this.subscriptionId = subscriptionId; this.topicName = topicName; this.consumerGroupId = consumerGroupId; this.consumerIds = consumerIds; } + public String getSubscriptionId() { + return subscriptionId; + } + public String getTopicName() { return topicName; } @@ -45,7 +55,9 @@ public class Subscription { @Override public String toString() { - return "Subscription{topicName=" + return "Subscription{subscriptionId=" + + subscriptionId + + ", topicName=" + topicName + ", consumerGroupId=" + consumerGroupId diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 index 04f603857a4..632911fb8af 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 @@ -221,6 +221,7 @@ keyWords | STATELESS | STATEMENT | STOP + | SUBSCRIPTION | SUBSCRIPTIONS | SUBSTRING | SYSTEM diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index dc88779f76e..02b5062cb02 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 @@ -56,10 +56,8 @@ ddlStatement | createPipe | alterPipe | dropPipe | startPipe | stopPipe | showPipes // Pipe Plugin | createPipePlugin | dropPipePlugin | showPipePlugins - // TOPIC - | createTopic | dropTopic | showTopics // Subscription - | showSubscriptions + | createTopic | dropTopic | showTopics | showSubscriptions | dropSubscription // CQ | createContinuousQuery | dropContinuousQuery | showContinuousQueries // Cluster @@ -667,7 +665,8 @@ showPipePlugins : SHOW PIPEPLUGINS ; -// Topic ========================================================================================= + +// Subscription ========================================================================================= createTopic : CREATE TOPIC (IF NOT EXISTS)? topicName=identifier topicAttributesClause? ; @@ -688,11 +687,14 @@ showTopics : SHOW ((TOPIC topicName=identifier) | TOPICS ) ; -// Subscriptions ========================================================================================= showSubscriptions : SHOW SUBSCRIPTIONS (ON topicName=identifier)? ; +dropSubscription + : DROP SUBSCRIPTION (IF EXISTS)? subscriptionId=identifier + ; + // AI Model ========================================================================================= // ---- Create Model createModel diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 index 10d0da69de4..f90a0d3dbfd 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 @@ -790,6 +790,10 @@ STOP : S T O P ; +SUBSCRIPTION + : S U B S C R I P T I O N + ; + SUBSCRIPTIONS : S U B S C R I P T I O N S ; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java index d6679ea4c11..e3c336cb5b8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java @@ -31,6 +31,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; public class SubscriptionTableResp implements DataSet { private final TSStatus status; @@ -47,29 +50,30 @@ public class SubscriptionTableResp implements DataSet { } public SubscriptionTableResp filter(String topicName) { - if (topicName == null) { - return this; - } else { - final List<SubscriptionMeta> filteredSubscriptionMeta = new ArrayList<>(); - for (SubscriptionMeta subscriptionMeta : allSubscriptionMeta) { - if (subscriptionMeta.getTopicName().equals(topicName)) { - filteredSubscriptionMeta.add(subscriptionMeta); - break; - } - } - return new SubscriptionTableResp(status, filteredSubscriptionMeta, allConsumerGroupMeta); - } + return new SubscriptionTableResp( + status, + allSubscriptionMeta.stream() + .filter( + subscriptionMeta -> + (Objects.isNull(topicName) + || Objects.equals( + subscriptionMeta.getTopicMeta().getTopicName(), topicName))) + .collect(Collectors.toList()), + allConsumerGroupMeta); } public TShowSubscriptionResp convertToTShowSubscriptionResp() { final List<TShowSubscriptionInfo> showSubscriptionInfoList = new ArrayList<>(); for (SubscriptionMeta subscriptionMeta : allSubscriptionMeta) { - showSubscriptionInfoList.add( + TShowSubscriptionInfo showSubscriptionInfo = new TShowSubscriptionInfo( - subscriptionMeta.getTopicName(), + subscriptionMeta.getTopicMeta().getTopicName(), subscriptionMeta.getConsumerGroupId(), - subscriptionMeta.getConsumerIds())); + subscriptionMeta.getConsumerIds()); + Optional<Long> creationTime = subscriptionMeta.getCreationTime(); + creationTime.ifPresent(showSubscriptionInfo::setCreationTime); + showSubscriptionInfoList.add(showSubscriptionInfo); } return new TShowSubscriptionResp(status).setSubscriptionInfoList(showSubscriptionInfoList); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 2e831747c9e..0a207ab7238 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -161,6 +161,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropSubscriptionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq; @@ -2241,6 +2242,14 @@ public class ConfigManager implements IManager { : status; } + @Override + public TSStatus dropSubscriptionById(TDropSubscriptionReq req) { + TSStatus status = confirmLeader(); + return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + ? subscriptionManager.getSubscriptionCoordinator().dropSubscription(req) + : status; + } + @Override public TShowSubscriptionResp showSubscription(TShowSubscriptionReq req) { TSStatus status = confirmLeader(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index 6fd0a8b567c..657f068e674 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -87,6 +87,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropSubscriptionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq; @@ -752,6 +753,8 @@ public interface IManager { TSStatus dropSubscription(TUnsubscribeReq req); + TSStatus dropSubscriptionById(TDropSubscriptionReq req); + TShowSubscriptionResp showSubscription(TShowSubscriptionReq req); TGetAllSubscriptionInfoResp getAllSubscriptionInfo(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java index 20dfce44bf2..28c596de7ba 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java @@ -30,6 +30,7 @@ import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo; import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropSubscriptionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetAllTopicInfoResp; @@ -42,10 +43,12 @@ import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; public class SubscriptionCoordinator { @@ -248,6 +251,30 @@ public class SubscriptionCoordinator { return status; } + public TSStatus dropSubscription(TDropSubscriptionReq req) { + final String subscriptionId = req.getSubsciptionId(); + final boolean isSetIfExistsCondition = + req.isSetIfExistsCondition() && req.isIfExistsCondition(); + final Optional<Pair<String, String>> topicNameWithConsumerGroupName = + subscriptionInfo.parseSubscriptionId(subscriptionId); + if (!topicNameWithConsumerGroupName.isPresent()) { + return isSetIfExistsCondition + ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS) + : RpcUtils.getStatus( + TSStatusCode.TOPIC_NOT_EXIST_ERROR, + String.format( + "Failed to drop subscription %s. Failures: %s does not exist.", + subscriptionId, subscriptionId)); + } + return configManager + .getProcedureManager() + .dropSubscription( + new TUnsubscribeReq() + .setConsumerId(null) + .setConsumerGroupId(topicNameWithConsumerGroupName.get().right) + .setTopicNames(Collections.singleton(topicNameWithConsumerGroupName.get().left))); + } + public TShowSubscriptionResp showSubscription(TShowSubscriptionReq req) { try { return ((SubscriptionTableResp) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java index 2ccde563866..9a1c6acc72a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java @@ -44,6 +44,8 @@ import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.rpc.subscription.exception.SubscriptionException; +import org.apache.thrift.annotation.Nullable; +import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,9 +56,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -603,15 +607,18 @@ public class SubscriptionInfo implements SnapshotProcessor { private void checkBeforeUnsubscribeInternal(TUnsubscribeReq unsubscribeReq) throws SubscriptionException { // 1. Check if the consumer exists - if (!isConsumerExisted(unsubscribeReq.getConsumerGroupId(), unsubscribeReq.getConsumerId())) { - // There is no consumer with the same consumerId and consumerGroupId, - // we should end the procedure - final String exceptionMessage = - String.format( - "Failed to unsubscribe because the consumer %s in consumer group %s does not exist", - unsubscribeReq.getConsumerId(), unsubscribeReq.getConsumerGroupId()); - LOGGER.warn(exceptionMessage); - throw new SubscriptionException(exceptionMessage); + // NOTE: consumer id may be null if drop subscription by session + if (Objects.nonNull(unsubscribeReq.getConsumerId())) { + if (!isConsumerExisted(unsubscribeReq.getConsumerGroupId(), unsubscribeReq.getConsumerId())) { + // There is no consumer with the same consumerId and consumerGroupId, + // we should end the procedure + final String exceptionMessage = + String.format( + "Failed to unsubscribe because the consumer %s in consumer group %s does not exist", + unsubscribeReq.getConsumerId(), unsubscribeReq.getConsumerGroupId()); + LOGGER.warn(exceptionMessage); + throw new SubscriptionException(exceptionMessage); + } } // 2. Check if all topics exist. No need to check if already subscribed. @@ -638,17 +645,28 @@ public class SubscriptionInfo implements SnapshotProcessor { } private List<SubscriptionMeta> getAllSubscriptionMeta() { + return getAllSubscriptionMetaInternal(null); + } + + private List<SubscriptionMeta> getAllSubscriptionMetaInternal( + @Nullable Predicate<TopicMeta> predicate) { List<SubscriptionMeta> allSubscriptions = new ArrayList<>(); for (TopicMeta topicMeta : topicMetaKeeper.getAllTopicMeta()) { + if (Objects.nonNull(predicate) && !predicate.test(topicMeta)) { + continue; + } for (String consumerGroupId : consumerGroupMetaKeeper.getSubscribedConsumerGroupIds(topicMeta.getTopicName())) { Set<String> subscribedConsumerIDs = consumerGroupMetaKeeper.getConsumersSubscribingTopic( consumerGroupId, topicMeta.getTopicName()); + Optional<Long> creationTime = + consumerGroupMetaKeeper.getSubscriptionCreationTime( + consumerGroupId, topicMeta.getTopicName()); if (!subscribedConsumerIDs.isEmpty()) { allSubscriptions.add( new SubscriptionMeta( - topicMeta.getTopicName(), consumerGroupId, subscribedConsumerIDs)); + topicMeta, consumerGroupId, subscribedConsumerIDs, creationTime.orElse(null))); } } } @@ -661,6 +679,24 @@ public class SubscriptionInfo implements SnapshotProcessor { .collect(Collectors.toList()); } + public Optional<Pair<String, String>> parseSubscriptionId(String subscriptionId) { + acquireReadLock(); + try { + List<SubscriptionMeta> allSubscriptions = getAllSubscriptionMetaInternal(null); + for (SubscriptionMeta subscriptionMeta : allSubscriptions) { + if (Objects.equals(subscriptionId, subscriptionMeta.getSubscriptionId())) { + return Optional.of( + new Pair<>( + subscriptionMeta.getTopicMeta().getTopicName(), + subscriptionMeta.getConsumerGroupId())); + } + } + return Optional.empty(); + } finally { + releaseReadLock(); + } + } + ///////////////////////////////// Snapshot ///////////////////////////////// @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 0c2d0633c76..0daf96f0d25 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -130,6 +130,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropSubscriptionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq; @@ -1182,6 +1183,11 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac return configManager.dropSubscription(req); } + @Override + public TSStatus dropSubscriptionById(TDropSubscriptionReq req) { + return configManager.dropSubscriptionById(req); + } + @Override public TShowSubscriptionResp showSubscription(TShowSubscriptionReq req) { return configManager.showSubscription(req); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java index bb5f557ec0e..ed3efee9a1b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java @@ -210,6 +210,7 @@ public class AuditLogger { case RENAME_LOGICAL_VIEW: case CREATE_TOPIC: case DROP_TOPIC: + case DROP_SUBSCRIPTION: return AuditLogOperation.DDL; case LOAD_DATA: case INSERT: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index f809c92be5c..979c18320ca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -92,6 +92,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropSubscriptionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq; @@ -1113,6 +1114,12 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie () -> client.dropSubscription(req), status -> !updateConfigNodeLeader(status)); } + @Override + public TSStatus dropSubscriptionById(TDropSubscriptionReq req) throws TException { + return executeRemoteCallWithRetry( + () -> client.dropSubscriptionById(req), status -> !updateConfigNodeLeader(status)); + } + @Override public TShowSubscriptionResp showSubscription(TShowSubscriptionReq req) throws TException { return executeRemoteCallWithRetry( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java index 18c3908f13d..0f26c42e39d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java @@ -127,6 +127,7 @@ public class ColumnHeaderConstant { // column names for show subscriptions statement public static final String CONSUMER_GROUP_NAME = "ConsumerGroupName"; public static final String SUBSCRIBED_CONSUMERS = "SubscribedConsumers"; + public static final String SUBSCRIPTION_ID = "SubscriptionID"; // show cluster status public static final String NODE_TYPE_CONFIG_NODE = "ConfigNode"; @@ -455,6 +456,7 @@ public class ColumnHeaderConstant { public static final List<ColumnHeader> showSubscriptionColumnHeaders = ImmutableList.of( + new ColumnHeader(SUBSCRIPTION_ID, TSDataType.TEXT), new ColumnHeader(TOPIC_NAME, TSDataType.TEXT), new ColumnHeader(CONSUMER_GROUP_NAME, TSDataType.TEXT), new ColumnHeader(SUBSCRIBED_CONSUMERS, TSDataType.TEXT)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java index 21caaa6e25f..7a0556007ca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java @@ -96,8 +96,9 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.SetThrott import org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowSpaceQuotaTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowThrottleQuotaTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.CreateTopicTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.DropSubscriptionTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.DropTopicTask; -import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionsTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowTopicsTask; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.StatementNode; @@ -149,6 +150,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateReg import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.ReconstructRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.RemoveRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowSubscriptionsStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowTopicsStatement; @@ -516,12 +518,6 @@ public class ConfigTaskVisitor extends StatementVisitor<IConfigTask, MPPQueryCon return new StopPipeTask(stopPipeStatement); } - @Override - public IConfigTask visitShowSubscriptions( - ShowSubscriptionsStatement showSubscriptionsStatement, MPPQueryContext context) { - return new ShowSubscriptionTask(showSubscriptionsStatement); - } - public IConfigTask visitCreateTopic( CreateTopicStatement createTopicStatement, MPPQueryContext context) { return new CreateTopicTask(createTopicStatement); @@ -539,6 +535,18 @@ public class ConfigTaskVisitor extends StatementVisitor<IConfigTask, MPPQueryCon return new ShowTopicsTask(showTopicsStatement); } + @Override + public IConfigTask visitShowSubscriptions( + ShowSubscriptionsStatement showSubscriptionsStatement, MPPQueryContext context) { + return new ShowSubscriptionsTask(showSubscriptionsStatement); + } + + @Override + public IConfigTask visitDropSubscription( + DropSubscriptionStatement dropSubscriptionStatement, MPPQueryContext context) { + return new DropSubscriptionTask(dropSubscriptionStatement); + } + @Override public IConfigTask visitDeleteTimeSeries( DeleteTimeSeriesStatement deleteTimeSeriesStatement, MPPQueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index d2b5d0bbe77..abdecf6ba73 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -84,6 +84,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropSubscriptionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq; @@ -173,7 +174,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.sys.TestConnectionT import org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.ShowPipeTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowSpaceQuotaTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowThrottleQuotaTask; -import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionsTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowTopicsTask; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.visitor.TransformToViewExpressionVisitor; @@ -212,6 +213,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateReg import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.ReconstructRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.RemoveRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowSubscriptionsStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowTopicsStatement; @@ -2098,7 +2100,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { return future; } - ShowSubscriptionTask.buildTSBlock( + ShowSubscriptionsTask.buildTSBlock( showSubscriptionResp.isSetSubscriptionInfoList() ? showSubscriptionResp.getSubscriptionInfoList() : Collections.emptyList(), @@ -2109,6 +2111,27 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { return future; } + public SettableFuture<ConfigTaskResult> dropSubscription( + final DropSubscriptionStatement dropSubscriptionStatement) { + final SettableFuture<ConfigTaskResult> future = SettableFuture.create(); + try (ConfigNodeClient configNodeClient = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + final TSStatus tsStatus = + configNodeClient.dropSubscriptionById( + new TDropSubscriptionReq() + .setSubsciptionId(dropSubscriptionStatement.getSubscriptionId()) + .setIfExistsCondition(dropSubscriptionStatement.hasIfExistsCondition())); + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { + future.setException(new IoTDBException(tsStatus.message, tsStatus.code)); + } else { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } + } catch (Exception e) { + future.setException(e); + } + return future; + } + @Override public SettableFuture<ConfigTaskResult> createTopic(CreateTopicStatement createTopicStatement) { final SettableFuture<ConfigTaskResult> future = SettableFuture.create(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java index e05457d9cf8..0b800a949a0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java @@ -62,6 +62,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateReg import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.ReconstructRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.RemoveRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowSubscriptionsStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowTopicsStatement; @@ -195,6 +196,9 @@ public interface IConfigTaskExecutor { SettableFuture<ConfigTaskResult> showSubscriptions( ShowSubscriptionsStatement showSubscriptionsStatement); + SettableFuture<ConfigTaskResult> dropSubscription( + DropSubscriptionStatement dropSubscriptionStatement); + SettableFuture<ConfigTaskResult> createTopic(CreateTopicStatement createTopicStatement); SettableFuture<ConfigTaskResult> dropTopic(DropTopicStatement dropTopicStatement); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropSubscriptionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropSubscriptionTask.java new file mode 100644 index 00000000000..1c8725d9b43 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropSubscriptionTask.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription; + +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement; + +import com.google.common.util.concurrent.ListenableFuture; + +public class DropSubscriptionTask implements IConfigTask { + + private final DropSubscriptionStatement dropSubscriptionStatement; + + public DropSubscriptionTask(final DropSubscriptionStatement dropSubscriptionStatement) { + this.dropSubscriptionStatement = dropSubscriptionStatement; + } + + @Override + public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor configTaskExecutor) + throws InterruptedException { + return configTaskExecutor.dropSubscription(dropSubscriptionStatement); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java similarity index 85% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionTask.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java index 29fe87699f5..982e8e33e95 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java @@ -38,11 +38,11 @@ import org.apache.tsfile.utils.Binary; import java.util.List; import java.util.stream.Collectors; -public class ShowSubscriptionTask implements IConfigTask { +public class ShowSubscriptionsTask implements IConfigTask { private final ShowSubscriptionsStatement showSubscriptionsStatement; - public ShowSubscriptionTask(ShowSubscriptionsStatement showSubscriptionsStatement) { + public ShowSubscriptionsTask(ShowSubscriptionsStatement showSubscriptionsStatement) { this.showSubscriptionsStatement = showSubscriptionsStatement; } @@ -62,15 +62,24 @@ public class ShowSubscriptionTask implements IConfigTask { for (TShowSubscriptionInfo tSubscriptionInfo : subscriptionInfoList) { builder.getTimeColumnBuilder().writeLong(0L); + final StringBuilder subscriptionId = + new StringBuilder( + tSubscriptionInfo.getTopicName() + "_" + tSubscriptionInfo.getConsumerGroupId()); + if (tSubscriptionInfo.getCreationTime() != 0) { + subscriptionId.append("_").append(tSubscriptionInfo.getCreationTime()); + } builder .getColumnBuilder(0) - .writeBinary(new Binary(tSubscriptionInfo.getTopicName(), TSFileConfig.STRING_CHARSET)); + .writeBinary(new Binary(subscriptionId.toString(), TSFileConfig.STRING_CHARSET)); builder .getColumnBuilder(1) + .writeBinary(new Binary(tSubscriptionInfo.getTopicName(), TSFileConfig.STRING_CHARSET)); + builder + .getColumnBuilder(2) .writeBinary( new Binary(tSubscriptionInfo.getConsumerGroupId(), TSFileConfig.STRING_CHARSET)); builder - .getColumnBuilder(2) + .getColumnBuilder(3) .writeBinary( new Binary( tSubscriptionInfo.getConsumerIds().toString(), TSFileConfig.STRING_CHARSET)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index 4b35f14fcfd..d33eaa953d3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -186,6 +186,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateReg import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.ReconstructRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.RemoveRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowSubscriptionsStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowTopicsStatement; @@ -4037,6 +4038,22 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { return showSubscriptionsStatement; } + @Override + public Statement visitDropSubscription(IoTDBSqlParser.DropSubscriptionContext ctx) { + final DropSubscriptionStatement dropSubscriptionStatement = new DropSubscriptionStatement(); + + if (ctx.subscriptionId != null) { + dropSubscriptionStatement.setSubscriptionId(parseIdentifier(ctx.subscriptionId.getText())); + } else { + throw new SemanticException( + "Not support for this sql in DROP SUBSCRIPTION, please enter subscriptionId."); + } + + dropSubscriptionStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() != null); + + return dropSubscriptionStatement; + } + @Override public Statement visitGetRegionId(IoTDBSqlParser.GetRegionIdContext ctx) { TConsensusGroupType type = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java index 4f49a5909e7..4d5fe5dbf3f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java @@ -182,7 +182,8 @@ public enum StatementType { CREATE_TOPIC, DROP_TOPIC, SHOW_TOPICS, - SHOW_SUBSCRIPTIONS, + DROP_SUBSCRIPTION, + SET_CONFIGURATION } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java index 7af95f40a9a..1792d4eb268 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java @@ -93,6 +93,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateReg import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.ReconstructRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.RemoveRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowSubscriptionsStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowTopicsStatement; @@ -560,6 +561,10 @@ public abstract class StatementVisitor<R, C> { return visitStatement(showSubscriptionsStatement, context); } + public R visitDropSubscription(DropSubscriptionStatement dropSubscriptionStatement, C context) { + return visitStatement(dropSubscriptionStatement, context); + } + public R visitGetRegionId(GetRegionIdStatement getRegionIdStatement, C context) { return visitStatement(getRegionIdStatement, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropSubscriptionStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropSubscriptionStatement.java new file mode 100644 index 00000000000..73284fb6de5 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropSubscriptionStatement.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.auth.entity.PrivilegeType; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.auth.AuthorityChecker; +import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; +import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.StatementType; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; +import org.apache.iotdb.rpc.TSStatusCode; + +import java.util.Collections; +import java.util.List; + +public class DropSubscriptionStatement extends Statement implements IConfigStatement { + + private String subscriptionId; + private boolean ifExistsCondition; + + public DropSubscriptionStatement() { + super(); + statementType = StatementType.DROP_SUBSCRIPTION; + } + + public String getSubscriptionId() { + return subscriptionId; + } + + public boolean hasIfExistsCondition() { + return ifExistsCondition; + } + + public void setSubscriptionId(String subscriptionId) { + this.subscriptionId = subscriptionId; + } + + public void setIfExists(boolean ifExistsCondition) { + this.ifExistsCondition = ifExistsCondition; + } + + @Override + public QueryType getQueryType() { + return QueryType.WRITE; + } + + @Override + public List<PartialPath> getPaths() { + return Collections.emptyList(); + } + + @Override + public <R, C> R accept(StatementVisitor<R, C> visitor, C context) { + return visitor.visitDropSubscription(this, context); + } + + @Override + public TSStatus checkPermissionBeforeProcess(String userName) { + if (AuthorityChecker.SUPER_USER.equals(userName)) { + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + return AuthorityChecker.getTSStatus( + AuthorityChecker.checkSystemPermission(userName, PrivilegeType.USE_PIPE.ordinal()), + PrivilegeType.USE_PIPE); + } +} diff --git a/iotdb-core/node-commons/pom.xml b/iotdb-core/node-commons/pom.xml index 3a5c64b9ecd..10d3c30d85c 100644 --- a/iotdb-core/node-commons/pom.xml +++ b/iotdb-core/node-commons/pom.xml @@ -167,6 +167,10 @@ <groupId>com.github.luben</groupId> <artifactId>zstd-jni</artifactId> </dependency> + <dependency> + <groupId>org.checkerframework</groupId> + <artifactId>checker-qual</artifactId> + </dependency> <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java index f7d4901884c..f8e486537f6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java @@ -21,8 +21,10 @@ package org.apache.iotdb.commons.subscription.meta.consumer; import org.apache.iotdb.rpc.subscription.exception.SubscriptionException; +import org.apache.thrift.annotation.Nullable; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.checkerframework.checker.nullness.qual.NonNull; import java.io.DataOutputStream; import java.io.IOException; @@ -33,8 +35,10 @@ import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; public class ConsumerGroupMeta { @@ -42,10 +46,12 @@ public class ConsumerGroupMeta { private long creationTime; private Map<String, Set<String>> topicNameToSubscribedConsumerIdSet; private Map<String, ConsumerMeta> consumerIdToConsumerMeta; + private Map<String, Long> topicNameToSubscriptionCreationTime; // used when creationTime < 0 public ConsumerGroupMeta() { this.topicNameToSubscribedConsumerIdSet = new ConcurrentHashMap<>(); this.consumerIdToConsumerMeta = new ConcurrentHashMap<>(); + this.topicNameToSubscriptionCreationTime = new ConcurrentHashMap<>(); } public ConsumerGroupMeta( @@ -53,7 +59,7 @@ public class ConsumerGroupMeta { this(); this.consumerGroupId = consumerGroupId; - this.creationTime = creationTime; + this.creationTime = -creationTime; consumerIdToConsumerMeta.put(firstConsumerMeta.getConsumerId(), firstConsumerMeta); } @@ -65,6 +71,8 @@ public class ConsumerGroupMeta { copied.topicNameToSubscribedConsumerIdSet = new ConcurrentHashMap<>(topicNameToSubscribedConsumerIdSet); copied.consumerIdToConsumerMeta = new ConcurrentHashMap<>(consumerIdToConsumerMeta); + copied.topicNameToSubscriptionCreationTime = + new ConcurrentHashMap<>(topicNameToSubscriptionCreationTime); return copied; } @@ -73,7 +81,11 @@ public class ConsumerGroupMeta { } public long getCreationTime() { - return creationTime; + return Math.abs(creationTime); + } + + private boolean shouldRecordSubscriptionCreationTime() { + return creationTime < 0; } public static /* @NonNull */ Set<String> getTopicsUnsubByGroup( @@ -140,6 +152,12 @@ public class ConsumerGroupMeta { return topicNameToSubscribedConsumerIdSet.getOrDefault(topic, Collections.emptySet()); } + public Optional<Long> getSubscriptionTime(final String topic) { + return shouldRecordSubscriptionCreationTime() + ? Optional.ofNullable(topicNameToSubscriptionCreationTime.get(topic)) + : Optional.empty(); + } + public Set<String> getTopicsSubscribedByConsumer(final String consumerId) { final Set<String> topics = new HashSet<>(); for (final Map.Entry<String, Set<String>> topicNameToSubscribedConsumerId : @@ -169,15 +187,35 @@ public class ConsumerGroupMeta { for (final String topic : topics) { topicNameToSubscribedConsumerIdSet - .computeIfAbsent(topic, k -> new HashSet<>()) + .computeIfAbsent( + topic, + k -> { + if (shouldRecordSubscriptionCreationTime()) { + topicNameToSubscriptionCreationTime.put(topic, System.currentTimeMillis()); + } + return new HashSet<>(); + }) .add(consumerId); } } /** * @return topics subscribed by no consumers in this group after this removal. + * @param consumerId if null, remove subscriptions of topics for all consumers */ - public Set<String> removeSubscription(final String consumerId, final Set<String> topics) { + public Set<String> removeSubscription( + @Nullable final String consumerId, final Set<String> topics) { + if (Objects.isNull(consumerId)) { + return consumerIdToConsumerMeta.keySet().stream() + .map(id -> removeSubscriptionInternal(id, topics)) + .flatMap(Set::stream) + .collect(Collectors.toSet()); + } + return removeSubscriptionInternal(consumerId, topics); + } + + private Set<String> removeSubscriptionInternal( + @NonNull final String consumerId, final Set<String> topics) { if (!consumerIdToConsumerMeta.containsKey(consumerId)) { throw new SubscriptionException( String.format( @@ -190,8 +228,12 @@ public class ConsumerGroupMeta { if (topicNameToSubscribedConsumerIdSet.containsKey(topic)) { topicNameToSubscribedConsumerIdSet.get(topic).remove(consumerId); if (topicNameToSubscribedConsumerIdSet.get(topic).isEmpty()) { + // remove subscription for consumer group noSubscriptionTopicAfterRemoval.add(topic); topicNameToSubscribedConsumerIdSet.remove(topic); + if (shouldRecordSubscriptionCreationTime()) { + topicNameToSubscriptionCreationTime.remove(topic); + } } } } @@ -226,6 +268,14 @@ public class ConsumerGroupMeta { ReadWriteIOUtils.write(entry.getKey(), outputStream); entry.getValue().serialize(outputStream); } + + if (shouldRecordSubscriptionCreationTime()) { + ReadWriteIOUtils.write(topicNameToSubscriptionCreationTime.size(), outputStream); + for (final Map.Entry<String, Long> entry : topicNameToSubscriptionCreationTime.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), outputStream); + ReadWriteIOUtils.write(entry.getValue(), outputStream); + } + } } public static ConsumerGroupMeta deserialize(final InputStream inputStream) throws IOException { @@ -256,6 +306,16 @@ public class ConsumerGroupMeta { consumerGroupMeta.consumerIdToConsumerMeta.put(key, value); } + consumerGroupMeta.topicNameToSubscriptionCreationTime = new ConcurrentHashMap<>(); + if (consumerGroupMeta.shouldRecordSubscriptionCreationTime()) { + size = ReadWriteIOUtils.readInt(inputStream); + for (int i = 0; i < size; ++i) { + final String key = ReadWriteIOUtils.readString(inputStream); + final long value = ReadWriteIOUtils.readLong(inputStream); + consumerGroupMeta.topicNameToSubscriptionCreationTime.put(key, value); + } + } + return consumerGroupMeta; } @@ -287,6 +347,16 @@ public class ConsumerGroupMeta { consumerGroupMeta.consumerIdToConsumerMeta.put(key, value); } + consumerGroupMeta.topicNameToSubscriptionCreationTime = new ConcurrentHashMap<>(); + if (consumerGroupMeta.shouldRecordSubscriptionCreationTime()) { + size = ReadWriteIOUtils.readInt(byteBuffer); + for (int i = 0; i < size; ++i) { + final String key = ReadWriteIOUtils.readString(byteBuffer); + final long value = ReadWriteIOUtils.readLong(byteBuffer); + consumerGroupMeta.topicNameToSubscriptionCreationTime.put(key, value); + } + } + return consumerGroupMeta; } @@ -301,11 +371,13 @@ public class ConsumerGroupMeta { return false; } final ConsumerGroupMeta that = (ConsumerGroupMeta) obj; - return Objects.equals(consumerGroupId, that.consumerGroupId) - && creationTime == that.creationTime + return Objects.equals(this.consumerGroupId, that.consumerGroupId) + && this.creationTime == that.creationTime + && Objects.equals( + this.topicNameToSubscribedConsumerIdSet, that.topicNameToSubscribedConsumerIdSet) + && Objects.equals(this.consumerIdToConsumerMeta, that.consumerIdToConsumerMeta) && Objects.equals( - topicNameToSubscribedConsumerIdSet, that.topicNameToSubscribedConsumerIdSet) - && Objects.equals(consumerIdToConsumerMeta, that.consumerIdToConsumerMeta); + this.topicNameToSubscriptionCreationTime, that.topicNameToSubscriptionCreationTime); } @Override @@ -314,7 +386,8 @@ public class ConsumerGroupMeta { consumerGroupId, creationTime, topicNameToSubscribedConsumerIdSet, - consumerIdToConsumerMeta); + consumerIdToConsumerMeta, + topicNameToSubscriptionCreationTime); } @Override @@ -323,11 +396,13 @@ public class ConsumerGroupMeta { + "consumerGroupId='" + consumerGroupId + "', creationTime=" - + creationTime + + getCreationTime() + ", topicNameToSubscribedConsumerIdSet=" + topicNameToSubscribedConsumerIdSet + ", consumerIdToConsumerMeta=" + consumerIdToConsumerMeta + + ", topicNameToSubscriptionCreationTime=" + + topicNameToSubscriptionCreationTime + "}"; } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java index c2775b204a9..34dbbbd761d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -91,6 +92,12 @@ public class ConsumerGroupMetaKeeper { : Collections.emptySet(); } + public Optional<Long> getSubscriptionCreationTime(String consumerGroupId, String topic) { + return consumerGroupIdToConsumerGroupMetaMap.containsKey(consumerGroupId) + ? consumerGroupIdToConsumerGroupMetaMap.get(consumerGroupId).getSubscriptionTime(topic) + : Optional.empty(); + } + public Set<String> getTopicsSubscribedByConsumer(String consumerGroupId, String consumerId) { return consumerGroupIdToConsumerGroupMetaMap.containsKey(consumerGroupId) ? consumerGroupIdToConsumerGroupMetaMap diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java index 91bbb461223..8f644e1aa20 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java @@ -19,36 +19,34 @@ package org.apache.iotdb.commons.subscription.meta.subscription; -import org.apache.tsfile.utils.PublicBAOS; -import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; -import java.io.DataOutputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.HashSet; +import java.util.Objects; +import java.util.Optional; import java.util.Set; /** SubscriptionMeta is created for show subscription and is not stored in meta keeper. */ public class SubscriptionMeta { - private String topicName; + private TopicMeta topicMeta; private String consumerGroupId; private Set<String> consumerIds; + private Long creationTime; private SubscriptionMeta() { // Empty constructor } - public SubscriptionMeta(String topicName, String consumerGroupId, Set<String> consumerIds) { - this.topicName = topicName; + public SubscriptionMeta( + TopicMeta topicMeta, String consumerGroupId, Set<String> consumerIds, Long creationTime) { + this.topicMeta = topicMeta; this.consumerGroupId = consumerGroupId; this.consumerIds = consumerIds; + this.creationTime = creationTime; } - public String getTopicName() { - return topicName; + public TopicMeta getTopicMeta() { + return topicMeta; } public String getConsumerGroupId() { @@ -59,60 +57,14 @@ public class SubscriptionMeta { return consumerIds; } - public ByteBuffer serialize() throws IOException { - PublicBAOS byteArrayOutputStream = new PublicBAOS(); - DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); - serialize(outputStream); - return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + public Optional<Long> getCreationTime() { + return Objects.nonNull(creationTime) ? Optional.of(creationTime) : Optional.empty(); } - public void serialize(DataOutputStream outputStream) throws IOException { - ReadWriteIOUtils.write(topicName, outputStream); - ReadWriteIOUtils.write(consumerGroupId, outputStream); - - ReadWriteIOUtils.write(consumerIds.size(), outputStream); - for (String consumerId : consumerIds) { - ReadWriteIOUtils.write(consumerId, outputStream); - } - } - - public void serialize(FileOutputStream outputStream) throws IOException { - ReadWriteIOUtils.write(topicName, outputStream); - ReadWriteIOUtils.write(consumerGroupId, outputStream); - - ReadWriteIOUtils.write(consumerIds.size(), outputStream); - for (String consumerId : consumerIds) { - ReadWriteIOUtils.write(consumerId, outputStream); - } - } - - public static SubscriptionMeta deserialize(InputStream inputStream) throws IOException { - final SubscriptionMeta subscriptionMeta = new SubscriptionMeta(); - - subscriptionMeta.topicName = ReadWriteIOUtils.readString(inputStream); - subscriptionMeta.consumerGroupId = ReadWriteIOUtils.readString(inputStream); - subscriptionMeta.consumerIds = new HashSet<>(); - - int size = ReadWriteIOUtils.readInt(inputStream); - for (int i = 0; i < size; i++) { - subscriptionMeta.consumerIds.add(ReadWriteIOUtils.readString(inputStream)); - } - - return subscriptionMeta; - } - - public static SubscriptionMeta deserialize(ByteBuffer byteBuffer) { - final SubscriptionMeta subscriptionMeta = new SubscriptionMeta(); - - subscriptionMeta.topicName = ReadWriteIOUtils.readString(byteBuffer); - subscriptionMeta.consumerGroupId = ReadWriteIOUtils.readString(byteBuffer); - subscriptionMeta.consumerIds = new HashSet<>(); - - int size = ReadWriteIOUtils.readInt(byteBuffer); - for (int i = 0; i < size; i++) { - subscriptionMeta.consumerIds.add(ReadWriteIOUtils.readString(byteBuffer)); - } - - return subscriptionMeta; + public String getSubscriptionId() { + final StringBuilder subscriptionId = + new StringBuilder(topicMeta.getTopicName() + "_" + consumerGroupId); + getCreationTime().ifPresent(creationTime -> subscriptionId.append("_").append(creationTime)); + return subscriptionId.toString(); } } diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index f8551590964..4642f3e20af 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -884,6 +884,12 @@ struct TShowSubscriptionInfo { 1: required string topicName 2: required string consumerGroupId 3: required set<string> consumerIds + 4: optional i64 creationTime +} + +struct TDropSubscriptionReq { + 1: required string subsciptionId + 2: optional bool ifExistsCondition } struct TGetAllSubscriptionInfoResp { @@ -1667,9 +1673,12 @@ service IConfigNodeRPCService { /** Create subscription */ common.TSStatus createSubscription(TSubscribeReq req) - /** Close subscription */ + /** Close subscription by consumer */ common.TSStatus dropSubscription(TUnsubscribeReq req) + /** Close subscription by session */ + common.TSStatus dropSubscriptionById(TDropSubscriptionReq req) + /** Show Subscription on topic name, if name is empty, show all subscriptions */ TShowSubscriptionResp showSubscription(TShowSubscriptionReq req)