This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 93211e10489 Subscription: adapt pipe completion signal for
automatically drop subscription (#12724)
93211e10489 is described below
commit 93211e104897b0091bb76f94ecfcc3a6f13331ff
Author: V_Galaxy <[email protected]>
AuthorDate: Fri Jun 14 13:05:24 2024 +0800
Subscription: adapt pipe completion signal for automatically drop
subscription (#12724)
---
.../apache/iotdb/SubscriptionSessionExample.java | 108 ++++++++++++++-
.../it/env/cluster/config/MppCommonConfig.java | 26 ++++
.../env/cluster/config/MppSharedCommonConfig.java | 25 ++++
.../it/env/remote/config/RemoteCommonConfig.java | 17 +++
.../org/apache/iotdb/itbase/env/CommonConfig.java | 7 +
.../it/dual/IoTDBSubscriptionTopicIT.java | 123 +++++++++++++++++
.../iotdb/rpc/subscription/config/TopicConfig.java | 32 ++++-
.../rpc/subscription/config/TopicConstant.java | 5 +
.../payload/poll/SubscriptionPollResponse.java | 3 +
.../payload/poll/SubscriptionPollResponseType.java | 2 +
.../payload/poll/TerminationPayload.java | 42 ++++++
.../consumer/SubscriptionConsumer.java | 18 ++-
.../subscription/CreateSubscriptionProcedure.java | 2 +
.../subscription/DropSubscriptionProcedure.java | 151 +++++++++++----------
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 18 ++-
.../agent/SubscriptionConsumerAgent.java | 7 +
.../subscription/agent/SubscriptionTopicAgent.java | 12 ++
.../db/subscription/broker/SubscriptionBroker.java | 13 ++
.../broker/SubscriptionPrefetchingQueue.java | 26 +++-
.../SubscriptionPrefetchingTabletsQueue.java | 12 ++
.../broker/SubscriptionPrefetchingTsFileQueue.java | 35 ++---
.../SubscriptionConnectorSubtaskLifeCycle.java | 5 +-
.../commons/pipe/agent/task/PipeTaskAgent.java | 24 +++-
.../meta/consumer/ConsumerGroupMeta.java | 79 +++++++----
.../commons/subscription/meta/topic/TopicMeta.java | 2 +
25 files changed, 661 insertions(+), 133 deletions(-)
diff --git
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
index 3aea4e0a762..3c3ae618ce3 100644
---
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
+++
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
@@ -25,9 +25,13 @@ import
org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.AckStrategy;
+import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
import org.apache.tsfile.read.TsFileReader;
import org.apache.tsfile.read.common.Path;
@@ -35,6 +39,7 @@ import org.apache.tsfile.read.expression.QueryExpression;
import org.apache.tsfile.read.query.dataset.QueryDataSet;
import java.io.IOException;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -51,6 +56,8 @@ public class SubscriptionSessionExample {
private static final String TOPIC_1 = "topic1";
private static final String TOPIC_2 = "`'topic2'`";
+ private static final String TOPIC_3 = "`\"topic3\"`";
+ private static final String TOPIC_4 = "`\"top \\.i.c4\"`";
private static final long SLEEP_NS = 1_000_000_000L;
private static final long POLL_TIMEOUT_MS = 10_000L;
@@ -226,10 +233,109 @@ public class SubscriptionSessionExample {
}
}
+ /** multi push consumer subscribe topic with tsfile format and query mode */
+ private static void dataSubscription3() throws Exception {
+ try (final SubscriptionSession subscriptionSession = new
SubscriptionSession(HOST, PORT)) {
+ subscriptionSession.open();
+ final Properties config = new Properties();
+ config.put(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+ config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_QUERY_VALUE);
+ subscriptionSession.createTopic(TOPIC_3, config);
+ }
+
+ final List<Thread> threads = new ArrayList<>();
+ for (int i = 0; i < PARALLELISM; ++i) {
+ final int idx = i;
+ final Thread thread =
+ new Thread(
+ () -> {
+ // Subscription: builder-style ctor
+ try (final SubscriptionPushConsumer consumer3 =
+ new SubscriptionPushConsumer.Builder()
+ .consumerId("c" + idx)
+ .consumerGroupId("cg3")
+ .ackStrategy(AckStrategy.AFTER_CONSUME)
+ .consumeListener(
+ message -> {
+ // do something for SubscriptionTsFileHandler
+ System.out.println(
+
message.getTsFileHandler().getFile().getAbsolutePath());
+ return ConsumeResult.SUCCESS;
+ })
+ .buildPushConsumer()) {
+ consumer3.open();
+ consumer3.subscribe(TOPIC_3);
+ while (consumer3.hasMoreData()) {
+ LockSupport.parkNanos(SLEEP_NS); // wait some time
+ }
+ }
+ });
+ thread.start();
+ threads.add(thread);
+ }
+
+ for (final Thread thread : threads) {
+ thread.join();
+ }
+ }
+
+ /** multi pull consumer subscribe topic with tsfile format and query mode */
+ private static void dataSubscription4() throws Exception {
+ try (final SubscriptionSession subscriptionSession = new
SubscriptionSession(HOST, PORT)) {
+ subscriptionSession.open();
+ final Properties config = new Properties();
+ config.put(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+ config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_QUERY_VALUE);
+ subscriptionSession.createTopic(TOPIC_4, config);
+ }
+
+ final List<Thread> threads = new ArrayList<>();
+ for (int i = 0; i < PARALLELISM; ++i) {
+ final int idx = i;
+ final Thread thread =
+ new Thread(
+ () -> {
+ // Subscription: builder-style ctor
+ try (final SubscriptionPullConsumer consumer4 =
+ new SubscriptionPullConsumer.Builder()
+ .consumerId("c" + idx)
+ .consumerGroupId("cg4")
+ .autoCommit(true)
+ .fileSaveFsync(true)
+ .buildPullConsumer()) {
+ consumer4.open();
+ consumer4.subscribe(TOPIC_4);
+ while (true) {
+ LockSupport.parkNanos(SLEEP_NS); // wait some time
+ if (!consumer4.hasMoreData()) {
+ break;
+ }
+ for (final SubscriptionMessage message :
consumer4.poll(POLL_TIMEOUT_MS)) {
+ final SubscriptionTsFileHandler handler =
message.getTsFileHandler();
+ handler.moveFile(
+ Paths.get(System.getProperty("user.dir"),
"new-subscription-dir")
+ .resolve(handler.getPath().getFileName()));
+ }
+ }
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ thread.start();
+ threads.add(thread);
+ }
+
+ for (final Thread thread : threads) {
+ thread.join();
+ }
+ }
+
public static void main(final String[] args) throws Exception {
prepareData();
// dataQuery();
// dataSubscription1();
- dataSubscription2();
+ // dataSubscription2();
+ // dataSubscription3();
+ dataSubscription4();
}
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 95fed66d08b..095562db5a5 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -438,6 +438,32 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
+ int pipeHeartbeatIntervalSecondsForCollectingPipeMeta) {
+ setProperty(
+ "pipe_heartbeat_interval_seconds_for_collecting_pipe_meta",
+ String.valueOf(pipeHeartbeatIntervalSecondsForCollectingPipeMeta));
+ return this;
+ }
+
+ @Override
+ public CommonConfig setPipeMetaSyncerInitialSyncDelayMinutes(
+ long pipeMetaSyncerInitialSyncDelayMinutes) {
+ setProperty(
+ "pipe_meta_syncer_initial_sync_delay_minutes",
+ String.valueOf(pipeMetaSyncerInitialSyncDelayMinutes));
+ return this;
+ }
+
+ @Override
+ public CommonConfig setPipeMetaSyncerSyncIntervalMinutes(long
pipeMetaSyncerSyncIntervalMinutes) {
+ setProperty(
+ "pipe_meta_syncer_sync_interval_minutes",
+ String.valueOf(pipeMetaSyncerSyncIntervalMinutes));
+ return this;
+ }
+
// For part of the log directory
public String getClusterConfigStr() {
return
fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 1851590dd41..7724bcf09c0 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -445,4 +445,29 @@ public class MppSharedCommonConfig implements CommonConfig
{
cnConfig.setCnConnectionTimeoutMs(connectionTimeoutMs);
return this;
}
+
+ @Override
+ public CommonConfig setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
+ int pipeHeartbeatIntervalSecondsForCollectingPipeMeta) {
+ dnConfig.setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
+ pipeHeartbeatIntervalSecondsForCollectingPipeMeta);
+ cnConfig.setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
+ pipeHeartbeatIntervalSecondsForCollectingPipeMeta);
+ return this;
+ }
+
+ @Override
+ public CommonConfig setPipeMetaSyncerInitialSyncDelayMinutes(
+ long pipeMetaSyncerInitialSyncDelayMinutes) {
+
dnConfig.setPipeMetaSyncerInitialSyncDelayMinutes(pipeMetaSyncerInitialSyncDelayMinutes);
+
cnConfig.setPipeMetaSyncerInitialSyncDelayMinutes(pipeMetaSyncerInitialSyncDelayMinutes);
+ return this;
+ }
+
+ @Override
+ public CommonConfig setPipeMetaSyncerSyncIntervalMinutes(long
pipeMetaSyncerSyncIntervalMinutes) {
+
dnConfig.setPipeMetaSyncerSyncIntervalMinutes(pipeMetaSyncerSyncIntervalMinutes);
+
cnConfig.setPipeMetaSyncerSyncIntervalMinutes(pipeMetaSyncerSyncIntervalMinutes);
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 0ac82fc8713..36675e64686 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -313,4 +313,21 @@ public class RemoteCommonConfig implements CommonConfig {
public CommonConfig setCnConnectionTimeoutMs(int connectionTimeoutMs) {
return this;
}
+
+ @Override
+ public CommonConfig setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
+ int pipeHeartbeatIntervalSecondsForCollectingPipeMeta) {
+ return this;
+ }
+
+ @Override
+ public CommonConfig setPipeMetaSyncerInitialSyncDelayMinutes(
+ long pipeMetaSyncerInitialSyncDelayMinutes) {
+ return this;
+ }
+
+ @Override
+ public CommonConfig setPipeMetaSyncerSyncIntervalMinutes(long
pipeMetaSyncerSyncIntervalMinutes) {
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index af47c8254e5..c06c45ddcd4 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -139,4 +139,11 @@ public interface CommonConfig {
CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize);
CommonConfig setCnConnectionTimeoutMs(int connectionTimeoutMs);
+
+ CommonConfig setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
+ int pipeHeartbeatIntervalSecondsForCollectingPipeMeta);
+
+ CommonConfig setPipeMetaSyncerInitialSyncDelayMinutes(long
pipeMetaSyncerInitialSyncDelayMinutes);
+
+ CommonConfig setPipeMetaSyncerSyncIntervalMinutes(long
pipeMetaSyncerSyncIntervalMinutes);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index ac7e8333984..19c033af151 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -20,12 +20,15 @@
package org.apache.iotdb.subscription.it.dual;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowTopicInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2Subscription;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import
org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeCriticalException;
import org.apache.iotdb.session.subscription.SubscriptionSession;
@@ -33,6 +36,10 @@ import
org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
+import org.apache.tsfile.read.TsFileReader;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.read.expression.QueryExpression;
+import org.apache.tsfile.read.query.dataset.QueryDataSet;
import org.apache.tsfile.write.record.Tablet;
import org.junit.Assert;
import org.junit.Test;
@@ -44,6 +51,7 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -63,6 +71,19 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSubscriptionTopicIT.class);
+ @Override
+ protected void setUpConfig() {
+ super.setUpConfig();
+
+ // Shorten heartbeat and sync interval to avoid timeout of query mode test
+ senderEnv
+ .getConfig()
+ .getCommonConfig()
+ .setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(30);
+
senderEnv.getConfig().getCommonConfig().setPipeMetaSyncerInitialSyncDelayMinutes(1);
+
senderEnv.getConfig().getCommonConfig().setPipeMetaSyncerSyncIntervalMinutes(1);
+ }
+
@Test
public void testTopicPathSubscription() throws Exception {
// Insert some historical data on sender
@@ -529,6 +550,108 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
testTopicInvalidRuntimeConfigTemplate("topic10", config);
}
+ @Test
+ public void testTopicWithQueryMode() throws Exception {
+ // Insert some historical data
+ try (final ISession session = senderEnv.getSessionConnection()) {
+ for (int i = 0; i < 100; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
+ }
+ session.executeNonQueryStatement("flush");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Create topic
+ final String topicName = "topic11";
+ final String host = senderEnv.getIP();
+ final int port = Integer.parseInt(senderEnv.getPort());
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ final Properties config = new Properties();
+ config.put(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+ config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_QUERY_VALUE);
+ session.createTopic(topicName, config);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Subscription
+ final AtomicInteger rowCount = new AtomicInteger();
+ final AtomicBoolean isClosed = new AtomicBoolean(false);
+ final Thread thread =
+ new Thread(
+ () -> {
+ try (final SubscriptionPullConsumer consumer =
+ new SubscriptionPullConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .autoCommit(false)
+ .buildPullConsumer()) {
+ consumer.open();
+ consumer.subscribe(topicName);
+ while (!isClosed.get()) {
+ LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS);
// wait some time
+ final List<SubscriptionMessage> messages =
+
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
+ for (final SubscriptionMessage message : messages) {
+ try (final TsFileReader tsFileReader =
+ message.getTsFileHandler().openReader()) {
+ final Path path = new Path("root.db.d1", "s1", true);
+ final QueryDataSet dataSet =
+ tsFileReader.query(
+
QueryExpression.create(Collections.singletonList(path), null));
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ rowCount.addAndGet(1);
+ }
+ }
+ }
+ consumer.commitSync(messages);
+ }
+ // Exiting the loop represents passing the awaitility test, at
this point the result
+ // of 'show subscription' is empty, so there is no need to
explicitly unsubscribe.
+ } catch (final Exception e) {
+ e.printStackTrace();
+ // Avoid failure
+ } finally {
+ LOGGER.info("consumer exiting...");
+ }
+ },
+ String.format("%s - consumer", testName.getMethodName()));
+ thread.start();
+
+ try {
+ // Keep retrying if there are execution failures
+ AWAIT.untilAsserted(
+ () -> {
+ // Check row count
+ Assert.assertEquals(100, rowCount.get());
+ // Check empty subscription
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final TShowSubscriptionResp showSubscriptionResp =
+ client.showSubscription(new TShowSubscriptionReq());
+ Assert.assertEquals(
+ RpcUtils.SUCCESS_STATUS.getCode(),
showSubscriptionResp.status.getCode());
+ Assert.assertNotNull(showSubscriptionResp.subscriptionInfoList);
+ Assert.assertEquals(0,
showSubscriptionResp.subscriptionInfoList.size());
+ }
+ });
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ isClosed.set(true);
+ thread.join();
+ }
+ }
+
private void testTopicInvalidRuntimeConfigTemplate(
final String topicName, final Properties config) throws Exception {
// Create topic
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
index fe62ba622a9..c3b5f24622c 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
@@ -40,6 +40,24 @@ public class TopicConfig extends PipeParameters {
super(attributes);
}
+ private static final Map<String, String> LOOSE_RANGE_TIME_CONFIG =
+ new HashMap<String, String>() {
+ {
+ put("history.loose-range", "time");
+ put("realtime.loose-range", "time");
+ }
+ };
+
+ private static final Map<String, String> REALTIME_BATCH_MODE_CONFIG =
+ Collections.singletonMap("realtime.mode", "batch");
+ private static final Map<String, String> REALTIME_STREAM_MODE_CONFIG =
+ Collections.singletonMap("realtime.mode", "stream");
+
+ private static final Map<String, String> QUERY_MODE_CONFIG =
+ Collections.singletonMap("mode", "query");
+ private static final Map<String, String> SUBSCRIBE_MODE_CONFIG =
+ Collections.singletonMap("mode", "subscribe");
+
/////////////////////////////// de/ser ///////////////////////////////
public void serialize(final DataOutputStream stream) throws IOException {
@@ -87,8 +105,7 @@ public class TopicConfig extends PipeParameters {
// enable loose range when using tsfile format
if (TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equals(
attributes.getOrDefault(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_DEFAULT_VALUE))) {
- attributesWithTimeRange.put("history.loose-range", "time");
- attributesWithTimeRange.put("realtime.loose-range", "time");
+ attributesWithTimeRange.putAll(LOOSE_RANGE_TIME_CONFIG);
}
return attributesWithTimeRange;
@@ -97,8 +114,15 @@ public class TopicConfig extends PipeParameters {
public Map<String, String> getAttributesWithRealtimeMode() {
return TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equals(
attributes.getOrDefault(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_DEFAULT_VALUE))
- ? Collections.singletonMap("realtime.mode", "batch")
- : Collections.singletonMap("realtime.mode", "hybrid");
+ ? REALTIME_BATCH_MODE_CONFIG
+ : REALTIME_STREAM_MODE_CONFIG;
+ }
+
+ public Map<String, String> getAttributesWithSourceMode() {
+ return TopicConstant.MODE_QUERY_VALUE.equals(
+ attributes.getOrDefault(TopicConstant.MODE_KEY,
TopicConstant.MODE_DEFAULT_VALUE))
+ ? QUERY_MODE_CONFIG
+ : SUBSCRIBE_MODE_CONFIG;
}
public Map<String, String> getAttributesWithProcessorPrefix() {
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
index bd44cf1bcc9..a6e8ec901bf 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
@@ -30,6 +30,11 @@ public class TopicConstant {
public static final String END_TIME_KEY = "end-time";
public static final String NOW_TIME_VALUE = "now";
+ public static final String MODE_KEY = "mode";
+ public static final String MODE_SUBSCRIBE_VALUE = "subscribe";
+ public static final String MODE_QUERY_VALUE = "query";
+ public static final String MODE_DEFAULT_VALUE = MODE_SUBSCRIBE_VALUE;
+
public static final String FORMAT_KEY = "format";
public static final String FORMAT_SESSION_DATA_SETS_HANDLER_VALUE =
"SessionDataSetsHandler";
public static final String FORMAT_TS_FILE_HANDLER_VALUE = "TsFileHandler";
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java
index d386903f124..01d173d2742 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java
@@ -95,6 +95,9 @@ public class SubscriptionPollResponse {
case ERROR:
payload = new ErrorPayload().deserialize(buffer);
break;
+ case TERMINATION:
+ payload = new TerminationPayload().deserialize(buffer);
+ break;
default:
LOGGER.warn("unexpected response type: {}, payload will be null",
responseType);
break;
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponseType.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponseType.java
index a22c9590c65..b27791b36c5 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponseType.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponseType.java
@@ -31,6 +31,8 @@ public enum SubscriptionPollResponseType {
FILE_INIT((short) 2),
FILE_PIECE((short) 3),
FILE_SEAL((short) 4),
+
+ TERMINATION((short) 5),
;
private final short type;
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TerminationPayload.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TerminationPayload.java
new file mode 100644
index 00000000000..c2da3f92e6d
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TerminationPayload.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.poll;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+public class TerminationPayload implements SubscriptionPollPayload {
+
+ @Override
+ public void serialize(final DataOutputStream stream) {
+ // do nothing
+ }
+
+ @Override
+ public SubscriptionPollPayload deserialize(final ByteBuffer buffer) {
+ // do nothing
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "TerminationPayload";
+ }
+}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index a6b418f0220..410348c991b 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -108,8 +108,12 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
return consumerGroupId;
}
- public Set<String> getSubscribedTopicNames() {
- return subscribedTopicNames;
+ /**
+ * @return When <b>only</b> subscribing to the query mode topics, if there
is no new data to
+ * process, return {@code false}; otherwise, return {@code true}.
+ */
+ public boolean hasMoreData() {
+ return !subscribedTopicNames.isEmpty();
}
/////////////////////////////// ctor ///////////////////////////////
@@ -393,6 +397,16 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
} else {
throw new
SubscriptionRuntimeNonCriticalException(errorMessage);
}
+ case TERMINATION:
+ final SubscriptionCommitContext commitContext =
pollResponse.getCommitContext();
+ final String topicNameToUnsubscribe =
commitContext.getTopicName();
+ LOGGER.info(
+ "Termination occurred when SubscriptionConsumer {} polling
topics {}, unsubscribe topic {} automatically",
+ this,
+ topicNames,
+ topicNameToUnsubscribe);
+ unsubscribe(Collections.singleton(topicNameToUnsubscribe),
false);
+ break;
default:
LOGGER.warn("unexpected response type: {}", responseType);
break;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
index 8710eec8522..0894feb62ad 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
@@ -229,6 +229,7 @@ public class CreateSubscriptionProcedure extends
AbstractOperateSubscriptionAndP
protected void rollbackFromOperateOnConfigNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("CreateSubscriptionProcedure:
rollbackFromOperateOnConfigNodes");
+ // TODO: roll back from the last executed procedure to the first executed
alterConsumerGroupProcedure.rollbackFromOperateOnConfigNodes(env);
TSStatus response;
@@ -283,6 +284,7 @@ public class CreateSubscriptionProcedure extends
AbstractOperateSubscriptionAndP
protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws IOException {
LOGGER.info("CreateSubscriptionProcedure: rollbackFromOperateOnDataNodes");
+ // TODO: roll back from the last executed procedure to the first executed
alterConsumerGroupProcedure.rollbackFromOperateOnDataNodes(env);
// Push all topic metas to datanode, may be time-consuming
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
index efb5aa22827..89da284ac0d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
@@ -62,9 +62,10 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
private TUnsubscribeReq unsubscribeReq;
- private AlterConsumerGroupProcedure alterConsumerGroupProcedure;
- private List<AlterTopicProcedure> alterTopicProcedures = new ArrayList<>();
+ // NOTE: The 'drop pipe' operation should be performed before 'alter
consumer group'.
private List<DropPipeProcedureV2> dropPipeProcedures = new ArrayList<>();
+ private List<AlterTopicProcedure> alterTopicProcedures = new ArrayList<>();
+ private AlterConsumerGroupProcedure alterConsumerGroupProcedure;
// Record failed index of procedures to rollback properly.
// We only record fail index when executing on config nodes, because when
executing on data nodes
@@ -76,7 +77,7 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
super();
}
- public DropSubscriptionProcedure(TUnsubscribeReq unsubscribeReq) {
+ public DropSubscriptionProcedure(final TUnsubscribeReq unsubscribeReq) {
this.unsubscribeReq = unsubscribeReq;
}
@@ -86,27 +87,28 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
}
@Override
- protected void executeFromValidate(ConfigNodeProcedureEnv env) throws
SubscriptionException {
+ protected void executeFromValidate(final ConfigNodeProcedureEnv env)
+ throws SubscriptionException {
LOGGER.info("DropSubscriptionProcedure: executeFromValidate");
subscriptionInfo.get().validateBeforeUnsubscribe(unsubscribeReq);
// Construct AlterConsumerGroupProcedure
- ConsumerGroupMeta updatedConsumerGroupMeta =
+ final ConsumerGroupMeta updatedConsumerGroupMeta =
subscriptionInfo.get().deepCopyConsumerGroupMeta(unsubscribeReq.getConsumerGroupId());
// Get topics subscribed by no consumers in this group after this
un-subscription
- Set<String> topicsUnsubByGroup =
+ final Set<String> topicsUnsubByGroup =
updatedConsumerGroupMeta.removeSubscription(
unsubscribeReq.getConsumerId(), unsubscribeReq.getTopicNames());
alterConsumerGroupProcedure =
new AlterConsumerGroupProcedure(updatedConsumerGroupMeta,
subscriptionInfo);
- for (String topic : unsubscribeReq.getTopicNames()) {
+ for (final String topic : unsubscribeReq.getTopicNames()) {
if (topicsUnsubByGroup.contains(topic)) {
// Topic will be subscribed by no consumers in this group
- TopicMeta updatedTopicMeta =
subscriptionInfo.get().deepCopyTopicMeta(topic);
+ final TopicMeta updatedTopicMeta =
subscriptionInfo.get().deepCopyTopicMeta(topic);
updatedTopicMeta.removeSubscribedConsumerGroup(unsubscribeReq.getConsumerGroupId());
alterTopicProcedures.add(new AlterTopicProcedure(updatedTopicMeta,
subscriptionInfo));
@@ -118,38 +120,39 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
}
}
- alterConsumerGroupProcedure.executeFromValidate(env);
+ // Validate DropPipeProcedureV2s
+ for (final DropPipeProcedureV2 dropPipeProcedure : dropPipeProcedures) {
+ dropPipeProcedure.executeFromValidateTask(env);
+ dropPipeProcedure.executeFromCalculateInfoForTask(env);
+ }
- for (AlterTopicProcedure alterTopicProcedure : alterTopicProcedures) {
+ // Validate AlterTopicProcedures
+ for (final AlterTopicProcedure alterTopicProcedure : alterTopicProcedures)
{
alterTopicProcedure.executeFromValidate(env);
}
- for (DropPipeProcedureV2 dropPipeProcedure : dropPipeProcedures) {
- dropPipeProcedure.executeFromValidateTask(env);
- dropPipeProcedure.executeFromCalculateInfoForTask(env);
- }
+ // Validate AlterConsumerGroupProcedure
+ alterConsumerGroupProcedure.executeFromValidate(env);
}
@Override
- protected void executeFromOperateOnConfigNodes(ConfigNodeProcedureEnv env)
+ protected void executeFromOperateOnConfigNodes(final ConfigNodeProcedureEnv
env)
throws SubscriptionException {
LOGGER.info("DropSubscriptionProcedure: executeFromOperateOnConfigNodes");
- alterConsumerGroupProcedure.executeFromOperateOnConfigNodes(env);
-
TSStatus response;
- List<AlterTopicPlan> alterTopicPlans =
- alterTopicProcedures.stream()
- .map(AlterTopicProcedure::getUpdatedTopicMeta)
- .map(AlterTopicPlan::new)
+ // Execute DropPipeProcedureV2s
+ final List<ConfigPhysicalPlan> dropPipePlans =
+ dropPipeProcedures.stream()
+ .map(proc -> new DropPipePlanV2(proc.getPipeName()))
.collect(Collectors.toList());
try {
response =
env.getConfigManager()
.getConsensusManager()
- .write(new AlterMultipleTopicsPlan(alterTopicPlans));
- } catch (ConsensusException e) {
+ .write(new OperateMultiplePipesPlanV2(dropPipePlans));
+ } catch (final ConsensusException e) {
LOGGER.warn("Failed in the write API executing the consensus layer due
to: ", e);
response = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
response.setMessage(e.getMessage());
@@ -157,19 +160,21 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& response.getSubStatusSize() > 0) {
// Record the failed index for rollback
- alterTopicProcedureFailIndexOnCN = response.getSubStatusSize() - 1;
+ dropPipeProcedureFailIndexOnCN = response.getSubStatusSize() - 1;
}
- List<ConfigPhysicalPlan> dropPipePlans =
- dropPipeProcedures.stream()
- .map(proc -> new DropPipePlanV2(proc.getPipeName()))
+ // Execute AlterTopicProcedures
+ final List<AlterTopicPlan> alterTopicPlans =
+ alterTopicProcedures.stream()
+ .map(AlterTopicProcedure::getUpdatedTopicMeta)
+ .map(AlterTopicPlan::new)
.collect(Collectors.toList());
try {
response =
env.getConfigManager()
.getConsensusManager()
- .write(new OperateMultiplePipesPlanV2(dropPipePlans));
- } catch (ConsensusException e) {
+ .write(new AlterMultipleTopicsPlan(alterTopicPlans));
+ } catch (final ConsensusException e) {
LOGGER.warn("Failed in the write API executing the consensus layer due
to: ", e);
response = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
response.setMessage(e.getMessage());
@@ -177,34 +182,24 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& response.getSubStatusSize() > 0) {
// Record the failed index for rollback
- dropPipeProcedureFailIndexOnCN = response.getSubStatusSize() - 1;
+ alterTopicProcedureFailIndexOnCN = response.getSubStatusSize() - 1;
}
+
+ // Execute AlterConsumerGroupProcedure
+ alterConsumerGroupProcedure.executeFromOperateOnConfigNodes(env);
}
@Override
- protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
+ protected void executeFromOperateOnDataNodes(final ConfigNodeProcedureEnv
env)
throws SubscriptionException, IOException {
LOGGER.info("DropSubscriptionProcedure: executeFromOperateOnDataNodes");
- alterConsumerGroupProcedure.executeFromOperateOnDataNodes(env);
-
- // push topic meta to data nodes
- List<ByteBuffer> topicMetaBinaryList = new ArrayList<>();
- for (AlterTopicProcedure alterTopicProcedure : alterTopicProcedures) {
-
topicMetaBinaryList.add(alterTopicProcedure.getUpdatedTopicMeta().serialize());
- }
- if
(pushTopicMetaHasException(env.pushMultiTopicMetaToDataNodes(topicMetaBinaryList)))
{
- // If not all topic meta are pushed successfully, the meta can be pushed
during meta sync.
- LOGGER.warn(
- "Failed to alter topics when creating subscription, metadata will be
synchronized later.");
- }
-
- // push pipe meta to data nodes
- List<String> pipeNames =
+ // Push pipe meta to data nodes
+ final List<String> pipeNames =
dropPipeProcedures.stream()
.map(DropPipeProcedureV2::getPipeName)
.collect(Collectors.toList());
- String exceptionMessage =
+ final String exceptionMessage =
AbstractOperatePipeProcedureV2.parsePushPipeMetaExceptionForPipe(
null, dropMultiPipeOnDataNodes(pipeNames, env));
if (!exceptionMessage.isEmpty()) {
@@ -214,23 +209,37 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
pipeNames,
exceptionMessage);
}
+
+ // Push topic meta to data nodes
+ final List<ByteBuffer> topicMetaBinaryList = new ArrayList<>();
+ for (final AlterTopicProcedure alterTopicProcedure : alterTopicProcedures)
{
+
topicMetaBinaryList.add(alterTopicProcedure.getUpdatedTopicMeta().serialize());
+ }
+ if
(pushTopicMetaHasException(env.pushMultiTopicMetaToDataNodes(topicMetaBinaryList)))
{
+ // If not all topic meta are pushed successfully, the meta can be pushed
during meta sync.
+ LOGGER.warn(
+ "Failed to alter topics when creating subscription, metadata will be
synchronized later.");
+ }
+
+ // Push consumer group meta to data nodes
+ alterConsumerGroupProcedure.executeFromOperateOnDataNodes(env);
}
@Override
- protected void rollbackFromValidate(ConfigNodeProcedureEnv env) {
+ protected void rollbackFromValidate(final ConfigNodeProcedureEnv env) {
LOGGER.info("DropSubscriptionProcedure: rollbackFromLock");
}
@Override
- protected void rollbackFromOperateOnConfigNodes(ConfigNodeProcedureEnv env) {
+ protected void rollbackFromOperateOnConfigNodes(final ConfigNodeProcedureEnv
env) {
LOGGER.info("DropSubscriptionProcedure: rollbackFromOperateOnConfigNodes");
+ // Rollback AlterConsumerGroupProcedure
alterConsumerGroupProcedure.rollbackFromOperateOnConfigNodes(env);
+ // Rollback AlterTopicProcedures
TSStatus response;
-
- // rollback alterTopicProcedures
- List<AlterTopicPlan> alterTopicRollbackPlans = new ArrayList<>();
+ final List<AlterTopicPlan> alterTopicRollbackPlans = new ArrayList<>();
for (int i = 0;
i <= Math.min(alterTopicProcedureFailIndexOnCN,
alterTopicProcedures.size());
i++) {
@@ -242,23 +251,25 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
env.getConfigManager()
.getConsensusManager()
.write(new AlterMultipleTopicsPlan(alterTopicRollbackPlans));
- } catch (ConsensusException e) {
+ } catch (final ConsensusException e) {
LOGGER.warn("Failed in the write API executing the consensus layer due
to: ", e);
response = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
response.setMessage(e.getMessage());
}
- // if failed to rollback, throw exception
+ // If failed to rollback, throw exception
if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new SubscriptionException(response.getMessage());
}
- // Do nothing to rollback dropPipeProcedures
+ // Do nothing to rollback DropPipeProcedureV2s
}
@Override
- protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws IOException {
+ protected void rollbackFromOperateOnDataNodes(final ConfigNodeProcedureEnv
env)
+ throws IOException {
LOGGER.info("DropSubscriptionProcedure: rollbackFromOperateOnDataNodes");
+ // Rollback AlterConsumerGroupProcedure
alterConsumerGroupProcedure.rollbackFromOperateOnDataNodes(env);
// Push all topic metas to datanode, may be time-consuming
@@ -268,7 +279,7 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
}
// Push all pipe metas to datanode, may be time-consuming
- String exceptionMessage =
+ final String exceptionMessage =
AbstractOperatePipeProcedureV2.parsePushPipeMetaExceptionForPipe(
null, AbstractOperatePipeProcedureV2.pushPipeMetaToDataNodes(env,
pipeTaskInfo));
if (!exceptionMessage.isEmpty()) {
@@ -279,7 +290,7 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
}
@Override
- public void serialize(DataOutputStream stream) throws IOException {
+ public void serialize(final DataOutputStream stream) throws IOException {
stream.writeShort(ProcedureType.DROP_SUBSCRIPTION_PROCEDURE.getTypeCode());
super.serialize(stream);
@@ -289,7 +300,7 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
final int size = unsubscribeReq.getTopicNamesSize();
ReadWriteIOUtils.write(size, stream);
if (size != 0) {
- for (String topicName : unsubscribeReq.getTopicNames()) {
+ for (final String topicName : unsubscribeReq.getTopicNames()) {
ReadWriteIOUtils.write(topicName, stream);
}
}
@@ -306,7 +317,7 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
if (alterTopicProcedures != null) {
ReadWriteIOUtils.write(true, stream);
ReadWriteIOUtils.write(alterTopicProcedures.size(), stream);
- for (AlterTopicProcedure topicProcedure : alterTopicProcedures) {
+ for (final AlterTopicProcedure topicProcedure : alterTopicProcedures) {
topicProcedure.serialize(stream);
}
} else {
@@ -317,7 +328,7 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
if (dropPipeProcedures != null) {
ReadWriteIOUtils.write(true, stream);
ReadWriteIOUtils.write(dropPipeProcedures.size(), stream);
- for (AbstractOperatePipeProcedureV2 pipeProcedure : dropPipeProcedures) {
+ for (final AbstractOperatePipeProcedureV2 pipeProcedure :
dropPipeProcedures) {
pipeProcedure.serialize(stream);
}
} else {
@@ -326,7 +337,7 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
}
@Override
- public void deserialize(ByteBuffer byteBuffer) {
+ public void deserialize(final ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
unsubscribeReq =
@@ -355,7 +366,7 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
// This readShort should return ALTER_TOPIC_PROCEDURE, and we ignore
it.
ReadWriteIOUtils.readShort(byteBuffer);
- AlterTopicProcedure topicProcedure = new AlterTopicProcedure();
+ final AlterTopicProcedure topicProcedure = new AlterTopicProcedure();
topicProcedure.deserialize(byteBuffer);
alterTopicProcedures.add(topicProcedure);
}
@@ -366,9 +377,9 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
// This readShort should return DROP_PIPE_PROCEDURE.
- short typeCode = ReadWriteIOUtils.readShort(byteBuffer);
+ final short typeCode = ReadWriteIOUtils.readShort(byteBuffer);
if (typeCode == ProcedureType.DROP_PIPE_PROCEDURE_V2.getTypeCode()) {
- DropPipeProcedureV2 dropPipeProcedureV2 = new DropPipeProcedureV2();
+ final DropPipeProcedureV2 dropPipeProcedureV2 = new
DropPipeProcedureV2();
dropPipeProcedureV2.deserialize(byteBuffer);
dropPipeProcedures.add(dropPipeProcedureV2);
}
@@ -377,14 +388,14 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
}
@Override
- public boolean equals(Object o) {
+ public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
- DropSubscriptionProcedure that = (DropSubscriptionProcedure) o;
+ final DropSubscriptionProcedure that = (DropSubscriptionProcedure) o;
return Objects.equals(getProcId(), that.getProcId())
&& Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
@@ -408,7 +419,7 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
@TestOnly
public void setAlterConsumerGroupProcedure(
- AlterConsumerGroupProcedure alterConsumerGroupProcedure) {
+ final AlterConsumerGroupProcedure alterConsumerGroupProcedure) {
this.alterConsumerGroupProcedure = alterConsumerGroupProcedure;
}
@@ -418,7 +429,7 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
}
@TestOnly
- public void setAlterTopicProcedures(List<AlterTopicProcedure>
alterTopicProcedures) {
+ public void setAlterTopicProcedures(final List<AlterTopicProcedure>
alterTopicProcedures) {
this.alterTopicProcedures = alterTopicProcedures;
}
@@ -428,7 +439,7 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
}
@TestOnly
- public void setDropPipeProcedures(List<DropPipeProcedureV2>
dropPipeProcedures) {
+ public void setDropPipeProcedures(final List<DropPipeProcedureV2>
dropPipeProcedures) {
this.dropPipeProcedures = dropPipeProcedures;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 37c04e26f50..e50f1edcc69 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -238,16 +238,22 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
@Override
- protected void dropPipe(final String pipeName, final long creationTime) {
- super.dropPipe(pipeName, creationTime);
+ protected boolean dropPipe(final String pipeName, final long creationTime) {
+ if (!super.dropPipe(pipeName, creationTime)) {
+ return false;
+ }
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.deregister(pipeName + "_" + creationTime);
+
+ return true;
}
@Override
- protected void dropPipe(final String pipeName) {
- super.dropPipe(pipeName);
+ protected boolean dropPipe(final String pipeName) {
+ if (!super.dropPipe(pipeName)) {
+ return false;
+ }
final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (Objects.nonNull(pipeMeta)) {
@@ -255,6 +261,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.deregister(pipeName + "_" + creationTime);
}
+
+ return true;
}
public void stopAllPipesWithCriticalException() {
@@ -612,7 +620,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
///////////////////////// Pipe Consensus /////////////////////////
- public ProgressIndex getPipeTaskProgressIndex(String pipeName, int
consensusGroupId) {
+ public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final
int consensusGroupId) {
if (!tryReadLockWithTimeOut(10)) {
throw new PipeException(
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
index 5284d295863..983abb03642 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
@@ -114,6 +114,13 @@ public class SubscriptionConsumerAgent {
return;
}
+ // unbind and remove prefetching queue
+ final Set<String> topicsUnsubByGroup =
+ ConsumerGroupMeta.getTopicsUnsubByGroup(metaInAgent,
metaFromCoordinator);
+ for (final String topicName : topicsUnsubByGroup) {
+ SubscriptionAgent.broker().unbindPrefetchingQueue(consumerGroupId,
topicName, true);
+ }
+
// TODO: Currently we fully replace the entire ConsumerGroupMeta without
carefully checking the
// changes in its fields.
consumerGroupMetaKeeper.removeConsumerGroupMeta(consumerGroupId);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
index 45145bd2b67..bc174170422 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
@@ -149,4 +149,16 @@ public class SubscriptionTopicAgent {
releaseReadLock();
}
}
+
+ public String getTopicMode(final String topicName) {
+ acquireReadLock();
+ try {
+ return topicMetaKeeper
+ .getTopicMeta(topicName)
+ .getConfig()
+ .getStringOrDefault(TopicConstant.MODE_KEY,
TopicConstant.MODE_DEFAULT_VALUE);
+ } finally {
+ releaseReadLock();
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index d26ea8b4458..f3bd0371fdd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -65,6 +65,14 @@ public class SubscriptionBroker {
final String topicName = entry.getKey();
final SubscriptionPrefetchingQueue prefetchingQueue = entry.getValue();
if (topicNames.contains(topicName)) {
+ // before determining if it is closed
+ if (prefetchingQueue.isCompleted()) {
+ LOGGER.info(
+ "Subscription: prefetching queue bound to topic [{}] is
completed, return termination response to client",
+ topicName);
+
events.add(prefetchingQueue.generateSubscriptionPollTerminationResponse());
+ continue;
+ }
if (prefetchingQueue.isClosed()) {
LOGGER.warn("Subscription: prefetching queue bound to topic [{}] is
closed", topicName);
continue;
@@ -181,6 +189,11 @@ public class SubscriptionBroker {
// clean up events in prefetching queue, this operation is idempotent
prefetchingQueue.cleanup();
+ // mark prefetching queue completed only for topic of query mode
+ if
(SubscriptionAgent.topic().getTopicMode(topicName).equals(TopicConstant.MODE_QUERY_VALUE))
{
+ prefetchingQueue.markCompleted();
+ }
+
if (doRemove) {
topicNameToPrefetchingQueue.remove(topicName);
SubscriptionPrefetchingQueueMetrics.getInstance()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 9dec709230f..d93f64e2f18 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -25,11 +25,16 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.db.subscription.event.SubscriptionEventBinaryCache;
import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
+import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
+import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -124,7 +129,7 @@ public abstract class SubscriptionPrefetchingQueue {
subscriptionCommitIdGenerator.getAndIncrement());
}
- protected SubscriptionCommitContext
generateInvalidSubscriptionCommitContext() {
+ private SubscriptionCommitContext generateInvalidSubscriptionCommitContext()
{
return new SubscriptionCommitContext(
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
PipeAgent.runtime().getRebootTimes(),
@@ -176,4 +181,23 @@ public abstract class SubscriptionPrefetchingQueue {
public void markCompleted() {
isCompleted = true;
}
+
+ public SubscriptionEvent generateSubscriptionPollTerminationResponse() {
+ return new SubscriptionEvent(
+ Collections.emptyList(),
+ new SubscriptionPollResponse(
+ SubscriptionPollResponseType.TERMINATION.getType(),
+ new TerminationPayload(),
+ generateInvalidSubscriptionCommitContext()));
+ }
+
+ public SubscriptionEvent generateSubscriptionPollErrorResponse(
+ final String errorMessage, final boolean critical) {
+ return new SubscriptionEvent(
+ Collections.emptyList(),
+ new SubscriptionPollResponse(
+ SubscriptionPollResponseType.ERROR.getType(),
+ new ErrorPayload(errorMessage, critical),
+ generateInvalidSubscriptionCommitContext()));
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
index 14d3315e1b7..935494537b3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -138,6 +139,17 @@ public class SubscriptionPrefetchingTabletsQueue extends
SubscriptionPrefetching
continue;
}
+ if (event instanceof PipeTerminateEvent) {
+ LOGGER.info(
+ "Subscription: SubscriptionPrefetchingTabletsQueue {} commit
PipeTerminateEvent {}",
+ this,
+ event);
+ // commit directly
+ ((PipeTerminateEvent) event)
+
.decreaseReferenceCount(SubscriptionPrefetchingTsFileQueue.class.getName(),
true);
+ continue;
+ }
+
if (event instanceof TabletInsertionEvent) {
final List<Tablet> currentTablets =
convertToTablets((TabletInsertionEvent) event);
if (currentTablets.isEmpty()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 3f2dac42db6..dbb68809681 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -22,12 +22,13 @@ package org.apache.iotdb.db.subscription.broker;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.db.subscription.event.SubscriptionEventBinaryCache;
import org.apache.iotdb.db.subscription.event.SubscriptionTsFileEvent;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.FilePiecePayload;
import org.apache.iotdb.rpc.subscription.payload.poll.FileSealPayload;
@@ -42,7 +43,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -80,7 +80,7 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
}
@Override
- public SubscriptionTsFileEvent poll(final String consumerId) {
+ public SubscriptionEvent poll(final String consumerId) {
if (hasUnPollableOnTheFlySubscriptionTsFileEvent(consumerId)) {
return null;
}
@@ -102,6 +102,17 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
continue;
}
+ if (event instanceof PipeTerminateEvent) {
+ LOGGER.info(
+ "Subscription: SubscriptionPrefetchingTsFileQueue {} commit
PipeTerminateEvent {}",
+ this,
+ event);
+ // commit directly
+ ((PipeTerminateEvent) event)
+
.decreaseReferenceCount(SubscriptionPrefetchingTsFileQueue.class.getName(),
true);
+ continue;
+ }
+
if (event instanceof TabletInsertionEvent) {
final String errorMessage =
String.format(
@@ -136,7 +147,7 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
return null;
}
- public synchronized @NonNull SubscriptionTsFileEvent pollTsFile(
+ public synchronized @NonNull SubscriptionEvent pollTsFile(
final String consumerId, final String fileName, final long
writingOffset) {
// 1. Extract current event and check it
final SubscriptionTsFileEvent event =
consumerIdToCurrentEventMap.get(consumerId);
@@ -281,7 +292,7 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
return pollTsFile(consumerId, writingOffset, event);
}
- private synchronized @NonNull SubscriptionTsFileEvent pollTsFile(
+ private synchronized @NonNull SubscriptionEvent pollTsFile(
final String consumerId, final long writingOffset, final
SubscriptionTsFileEvent event) {
Pair<SubscriptionTsFileEvent, Boolean> newEventWithCommittable =
event.matchOrResetNext(writingOffset);
@@ -384,18 +395,8 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
return null;
}
- private SubscriptionTsFileEvent generateSubscriptionPollErrorResponse(
- final String errorMessage, final boolean critical) {
- return new SubscriptionTsFileEvent(
- Collections.emptyList(),
- new SubscriptionPollResponse(
- SubscriptionPollResponseType.ERROR.getType(),
- new ErrorPayload(errorMessage, critical),
- super.generateInvalidSubscriptionCommitContext()));
- }
-
- private SubscriptionTsFileEvent generateSubscriptionPollErrorResponse(final
String errorMessage) {
+ private SubscriptionEvent generateSubscriptionPollErrorResponse(final String
errorMessage) {
// consider non-critical by default, meaning the client can retry
- return generateSubscriptionPollErrorResponse(errorMessage, false);
+ return super.generateSubscriptionPollErrorResponse(errorMessage, false);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
index 43827ab85c4..41e690cbce9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
@@ -98,8 +98,11 @@ public class SubscriptionConnectorSubtaskLifeCycle extends
PipeConnectorSubtaskL
public synchronized void close() {
super.close();
+ // Here, the prefetching queue is not actually removed, because it's
uncertain whether the
+ // corresponding underlying pipe is automatically terminated. The actual
removal is carried out
+ // when dropping the subscription.
final String consumerGroupId = ((SubscriptionConnectorSubtask)
subtask).getConsumerGroupId();
final String topicName = ((SubscriptionConnectorSubtask)
subtask).getTopicName();
- SubscriptionAgent.broker().unbindPrefetchingQueue(consumerGroupId,
topicName, true);
+ SubscriptionAgent.broker().unbindPrefetchingQueue(consumerGroupId,
topicName, false);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index a1844bef572..65c1a95aaa3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -103,7 +103,7 @@ public abstract class PipeTaskAgent {
pipeMetaKeeper.acquireWriteLock();
}
- protected boolean tryWriteLockWithTimeOut(long timeOutInSeconds) {
+ protected boolean tryWriteLockWithTimeOut(final long timeOutInSeconds) {
try {
return pipeMetaKeeper.tryWriteLock(timeOutInSeconds);
} catch (final InterruptedException e) {
@@ -462,11 +462,14 @@ public abstract class PipeTaskAgent {
protected abstract Map<Integer, PipeTask> buildPipeTasks(final PipeMeta
pipeMetaFromCoordinator)
throws IllegalPathException;
- protected void dropPipe(final String pipeName, final long creationTime) {
+ /**
+ * @return {@code true} if a pipe has indeed been dropped, otherwise {@code
false}.
+ */
+ protected boolean dropPipe(final String pipeName, final long creationTime) {
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (!checkBeforeDropPipe(existedPipeMeta, pipeName, creationTime)) {
- return;
+ return false;
}
// Mark pipe meta as dropped first. This will help us detect if the pipe
meta has been dropped
@@ -483,7 +486,7 @@ public abstract class PipeTaskAgent {
+ "Skip dropping.",
pipeName,
creationTime);
- return;
+ return false;
}
// Trigger drop() method for each pipe task by parallel stream
@@ -496,13 +499,18 @@ public abstract class PipeTaskAgent {
// Remove pipe meta from pipe meta keeper
pipeMetaKeeper.removePipeMeta(pipeName);
+
+ return true;
}
- protected void dropPipe(final String pipeName) {
+ /**
+ * @return {@code true} if a pipe has indeed been dropped, otherwise {@code
false}.
+ */
+ protected boolean dropPipe(final String pipeName) {
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (!checkBeforeDropPipe(existedPipeMeta, pipeName)) {
- return;
+ return false;
}
// Mark pipe meta as dropped first. This will help us detect if the pipe
meta has been dropped
@@ -516,7 +524,7 @@ public abstract class PipeTaskAgent {
if (pipeTasks == null) {
LOGGER.info(
"Pipe {} has already been dropped or has not been created. Skip
dropping.", pipeName);
- return;
+ return false;
}
// Trigger drop() method for each pipe task by parallel stream
@@ -529,6 +537,8 @@ public abstract class PipeTaskAgent {
// Remove pipe meta from pipe meta keeper
pipeMetaKeeper.removePipeMeta(pipeName);
+
+ return true;
}
private void startPipe(final String pipeName, final long creationTime) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
index d53868208bf..764fbfca501 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
@@ -48,7 +48,7 @@ public class ConsumerGroupMeta {
}
public ConsumerGroupMeta(
- String consumerGroupId, long creationTime, ConsumerMeta
firstConsumerMeta) {
+ final String consumerGroupId, final long creationTime, final
ConsumerMeta firstConsumerMeta) {
this.consumerGroupId = consumerGroupId;
this.creationTime = creationTime;
this.topicNameToSubscribedConsumerIdSet = new HashMap<>();
@@ -74,20 +74,48 @@ public class ConsumerGroupMeta {
return creationTime;
}
+ public static /* @NonNull */ Set<String> getTopicsUnsubByGroup(
+ final ConsumerGroupMeta currentMeta, final ConsumerGroupMeta
updatedMeta) {
+ if (!Objects.equals(currentMeta.consumerGroupId,
updatedMeta.consumerGroupId)) {
+ return Collections.emptySet();
+ }
+ if (!Objects.equals(currentMeta.creationTime, updatedMeta.creationTime)) {
+ return Collections.emptySet();
+ }
+
+ // no need to check consumerIdToConsumerMeta here to avoid potential
inconsistent meta
+
+ final Set<String> unsubscribedTopicNames = new HashSet<>();
+ currentMeta
+ .topicNameToSubscribedConsumerIdSet
+ .keySet()
+ .forEach(
+ (topicName) -> {
+ if
(!updatedMeta.topicNameToSubscribedConsumerIdSet.containsKey(topicName)) {
+ unsubscribedTopicNames.add(topicName);
+ }
+ });
+ return unsubscribedTopicNames;
+ }
+
/////////////////////////////// consumer ///////////////////////////////
- public void addConsumer(ConsumerMeta consumerMeta) {
+ public void addConsumer(final ConsumerMeta consumerMeta) {
consumerIdToConsumerMeta.put(consumerMeta.getConsumerId(), consumerMeta);
}
- public void removeConsumer(String consumerId) {
+ public void removeConsumer(final String consumerId) {
consumerIdToConsumerMeta.remove(consumerId);
- for (Set<String> subscribedConsumers :
topicNameToSubscribedConsumerIdSet.values()) {
- subscribedConsumers.remove(consumerId);
+ for (final Map.Entry<String, Set<String>> entry :
+ topicNameToSubscribedConsumerIdSet.entrySet()) {
+ entry.getValue().remove(consumerId);
+ if (entry.getValue().isEmpty()) {
+ topicNameToSubscribedConsumerIdSet.remove(entry.getKey());
+ }
}
}
- public boolean containsConsumer(String consumerId) {
+ public boolean containsConsumer(final String consumerId) {
return consumerIdToConsumerMeta.containsKey(consumerId);
}
@@ -106,13 +134,13 @@ public class ConsumerGroupMeta {
* @return The set of consumer IDs subscribing the given topic in this
group. If no consumer is
* subscribing the topic, return an empty set.
*/
- public Set<String> getConsumersSubscribingTopic(String topic) {
+ public Set<String> getConsumersSubscribingTopic(final String topic) {
return topicNameToSubscribedConsumerIdSet.getOrDefault(topic,
Collections.emptySet());
}
- public Set<String> getTopicsSubscribedByConsumer(String consumerId) {
- Set<String> topics = new HashSet<>();
- for (Map.Entry<String, Set<String>> topicNameToSubscribedConsumerId :
+ public Set<String> getTopicsSubscribedByConsumer(final String consumerId) {
+ final Set<String> topics = new HashSet<>();
+ for (final Map.Entry<String, Set<String>> topicNameToSubscribedConsumerId :
topicNameToSubscribedConsumerIdSet.entrySet()) {
if (topicNameToSubscribedConsumerId.getValue().contains(consumerId)) {
topics.add(topicNameToSubscribedConsumerId.getKey());
@@ -121,7 +149,7 @@ public class ConsumerGroupMeta {
return topics;
}
- public void addSubscription(String consumerId, Set<String> topics) {
+ public void addSubscription(final String consumerId, final Set<String>
topics) {
if (!consumerIdToConsumerMeta.containsKey(consumerId)) {
throw new SubscriptionException(
String.format(
@@ -129,7 +157,7 @@ public class ConsumerGroupMeta {
consumerId, consumerGroupId));
}
- for (String topic : topics) {
+ for (final String topic : topics) {
topicNameToSubscribedConsumerIdSet
.computeIfAbsent(topic, k -> new HashSet<>())
.add(consumerId);
@@ -139,7 +167,7 @@ public class ConsumerGroupMeta {
/**
* @return topics subscribed by no consumers in this group after this
removal.
*/
- public Set<String> removeSubscription(String consumerId, Set<String> topics)
{
+ public Set<String> removeSubscription(final String consumerId, final
Set<String> topics) {
if (!consumerIdToConsumerMeta.containsKey(consumerId)) {
throw new SubscriptionException(
String.format(
@@ -147,8 +175,8 @@ public class ConsumerGroupMeta {
consumerId, consumerGroupId));
}
- Set<String> noSubscriptionTopicAfterRemoval = new HashSet<>();
- for (String topic : topics) {
+ final Set<String> noSubscriptionTopicAfterRemoval = new HashSet<>();
+ for (final String topic : topics) {
if (topicNameToSubscribedConsumerIdSet.containsKey(topic)) {
topicNameToSubscribedConsumerIdSet.get(topic).remove(consumerId);
if (topicNameToSubscribedConsumerIdSet.get(topic).isEmpty()) {
@@ -163,33 +191,34 @@ public class ConsumerGroupMeta {
/////////////////////////////// de/ser ///////////////////////////////
public ByteBuffer serialize() throws IOException {
- PublicBAOS byteArrayOutputStream = new PublicBAOS();
- DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
+ final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
serialize(outputStream);
return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
}
- public void serialize(OutputStream outputStream) throws IOException {
+ public void serialize(final OutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(consumerGroupId, outputStream);
ReadWriteIOUtils.write(creationTime, outputStream);
ReadWriteIOUtils.write(topicNameToSubscribedConsumerIdSet.size(),
outputStream);
- for (Map.Entry<String, Set<String>> entry :
topicNameToSubscribedConsumerIdSet.entrySet()) {
+ for (final Map.Entry<String, Set<String>> entry :
+ topicNameToSubscribedConsumerIdSet.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue().size(), outputStream);
- for (String id : entry.getValue()) {
+ for (final String id : entry.getValue()) {
ReadWriteIOUtils.write(id, outputStream);
}
}
ReadWriteIOUtils.write(consumerIdToConsumerMeta.size(), outputStream);
- for (Map.Entry<String, ConsumerMeta> entry :
consumerIdToConsumerMeta.entrySet()) {
+ for (final Map.Entry<String, ConsumerMeta> entry :
consumerIdToConsumerMeta.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
entry.getValue().serialize(outputStream);
}
}
- public static ConsumerGroupMeta deserialize(InputStream inputStream) throws
IOException {
+ public static ConsumerGroupMeta deserialize(final InputStream inputStream)
throws IOException {
final ConsumerGroupMeta consumerGroupMeta = new ConsumerGroupMeta();
consumerGroupMeta.consumerGroupId =
ReadWriteIOUtils.readString(inputStream);
@@ -220,7 +249,7 @@ public class ConsumerGroupMeta {
return consumerGroupMeta;
}
- public static ConsumerGroupMeta deserialize(ByteBuffer byteBuffer) {
+ public static ConsumerGroupMeta deserialize(final ByteBuffer byteBuffer) {
final ConsumerGroupMeta consumerGroupMeta = new ConsumerGroupMeta();
consumerGroupMeta.consumerGroupId =
ReadWriteIOUtils.readString(byteBuffer);
@@ -254,14 +283,14 @@ public class ConsumerGroupMeta {
/////////////////////////////// Object ///////////////////////////////
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- ConsumerGroupMeta that = (ConsumerGroupMeta) obj;
+ final ConsumerGroupMeta that = (ConsumerGroupMeta) obj;
return Objects.equals(consumerGroupId, that.consumerGroupId)
&& creationTime == that.creationTime
&& Objects.equals(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index a0e80db04e0..70666a07182 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -185,6 +185,8 @@ public class TopicMeta {
extractorAttributes.putAll(config.getAttributesWithTimeRange(creationTime));
// realtime mode
extractorAttributes.putAll(config.getAttributesWithRealtimeMode());
+ // source mode
+ extractorAttributes.putAll(config.getAttributesWithSourceMode());
return extractorAttributes;
}