This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 99d268fd4e6 Subscription: improve nack for subscribing tsfile & delete
tsfile that failed to poll (#12843)
99d268fd4e6 is described below
commit 99d268fd4e6b9aa610c0950c37bbc2e7bc0de4c2
Author: V_Galaxy <[email protected]>
AuthorDate: Fri Jul 5 17:49:54 2024 +0800
Subscription: improve nack for subscribing tsfile & delete tsfile that
failed to poll (#12843)
---
.../it/dual/IoTDBSubscriptionConsumerGroupIT.java | 80 ++---
.../it/dual/IoTDBSubscriptionTopicIT.java | 177 +++++++---
.../it/local/IoTDBSubscriptionBasicIT.java | 302 +----------------
.../it/local/IoTDBSubscriptionDataTypeIT.java | 368 +++++++++++++++++++++
.../consumer/SubscriptionConsumer.java | 16 +-
.../agent/SubscriptionBrokerAgent.java | 3 +-
.../db/subscription/broker/SubscriptionBroker.java | 8 +-
.../broker/SubscriptionPrefetchingQueue.java | 86 +++--
.../broker/SubscriptionPrefetchingTabletQueue.java | 6 +-
.../broker/SubscriptionPrefetchingTsFileQueue.java | 41 ++-
.../db/subscription/event/SubscriptionEvent.java | 7 +-
11 files changed, 659 insertions(+), 435 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
index 9a0d7865afe..eccfc634e6b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
@@ -976,54 +976,54 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
for (final SubscriptionMessage message : messages) {
final short messageType = message.getMessageType();
- if
(SubscriptionMessageType.isValidatedMessageType(messageType)) {
- switch (SubscriptionMessageType.valueOf(messageType)) {
- case SESSION_DATA_SETS_HANDLER:
- {
- for (final SubscriptionSessionDataSet dataSet :
- message.getSessionDataSetsHandler()) {
- final List<String> columnNameList =
dataSet.getColumnNames();
- while (dataSet.hasNext()) {
- final RowRecord record = dataSet.next();
- if
(!insertRowRecordEnrichedByConsumerGroupId(
- columnNameList, record.getTimestamp(),
consumerGroupId)) {
- receiverCrashed.set(true);
- throw new RuntimeException("detect
receiver crashed");
- }
+ if
(!SubscriptionMessageType.isValidatedMessageType(messageType)) {
+ LOGGER.warn("unexpected message type: {}",
messageType);
+ continue;
+ }
+ switch (SubscriptionMessageType.valueOf(messageType)) {
+ case SESSION_DATA_SETS_HANDLER:
+ {
+ for (final SubscriptionSessionDataSet dataSet :
+ message.getSessionDataSetsHandler()) {
+ final List<String> columnNameList =
dataSet.getColumnNames();
+ while (dataSet.hasNext()) {
+ final RowRecord record = dataSet.next();
+ if (!insertRowRecordEnrichedByConsumerGroupId(
+ columnNameList, record.getTimestamp(),
consumerGroupId)) {
+ receiverCrashed.set(true);
+ throw new RuntimeException("detect receiver
crashed");
}
}
- break;
}
- case TS_FILE_HANDLER:
- {
- try (final TsFileReader tsFileReader =
- message.getTsFileHandler().openReader()) {
- final QueryDataSet dataSet =
- tsFileReader.query(
- QueryExpression.create(
- Arrays.asList(
- new Path("root.topic1", "s",
true),
- new Path("root.topic2", "s",
true)),
- null));
- while (dataSet.hasNext()) {
- final RowRecord record = dataSet.next();
- for (final Path path : dataSet.getPaths()) {
- if
(!insertRowRecordEnrichedByConsumerGroupId(
- path.toString(),
record.getTimestamp(), consumerGroupId)) {
- receiverCrashed.set(true);
- throw new RuntimeException("detect
receiver crashed");
- }
+ break;
+ }
+ case TS_FILE_HANDLER:
+ {
+ try (final TsFileReader tsFileReader =
+ message.getTsFileHandler().openReader()) {
+ final QueryDataSet dataSet =
+ tsFileReader.query(
+ QueryExpression.create(
+ Arrays.asList(
+ new Path("root.topic1", "s",
true),
+ new Path("root.topic2", "s",
true)),
+ null));
+ while (dataSet.hasNext()) {
+ final RowRecord record = dataSet.next();
+ for (final Path path : dataSet.getPaths()) {
+ if
(!insertRowRecordEnrichedByConsumerGroupId(
+ path.toString(), record.getTimestamp(),
consumerGroupId)) {
+ receiverCrashed.set(true);
+ throw new RuntimeException("detect
receiver crashed");
}
}
}
- break;
}
- default:
- LOGGER.warn("unexpected message type: {}",
messageType);
break;
- }
- } else {
- LOGGER.warn("unexpected message type: {}",
messageType);
+ }
+ default:
+ LOGGER.warn("unexpected message type: {}",
messageType);
+ break;
}
}
consumer.commitSync(messages);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 2f9cb124311..04a1f33772f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -33,6 +33,9 @@ import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import org.apache.iotdb.session.subscription.SubscriptionSession;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
+import
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import org.apache.tsfile.read.TsFileReader;
@@ -84,7 +87,16 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
@Test
- public void testTopicPathSubscription() throws Exception {
+ public void testTabletTopicWithPath() throws Exception {
+
testTopicWithPathTemplate(TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE);
+ }
+
+ @Test
+ public void testTsFileTopicWithPath() throws Exception {
+ testTopicWithPathTemplate(TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+ }
+
+ private void testTopicWithPathTemplate(final String topicFormat) throws
Exception {
// Insert some historical data on sender
try (final ISession session = senderEnv.getSessionConnection()) {
for (int i = 0; i < 100; ++i) {
@@ -110,6 +122,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
session.open();
final Properties config = new Properties();
+ config.put(TopicConstant.FORMAT_KEY, topicFormat);
config.put(TopicConstant.PATH_KEY, "root.db.*.s");
session.createTopic(topicName, config);
} catch (final Exception e) {
@@ -138,14 +151,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS);
// wait some time
final List<SubscriptionMessage> messages =
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
- for (final SubscriptionMessage message : messages) {
- for (final Iterator<Tablet> it =
-
message.getSessionDataSetsHandler().tabletIterator();
- it.hasNext(); ) {
- final Tablet tablet = it.next();
- session.insertTablet(tablet);
- }
- }
+ insertData(messages, session);
consumer.commitSync(messages);
}
consumer.unsubscribe(topicName);
@@ -185,7 +191,16 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
@Test
- public void testTopicTimeSubscription() throws Exception {
+ public void testTabletTopicWithTime() throws Exception {
+
testTopicWithTimeTemplate(TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE);
+ }
+
+ @Test
+ public void testTsFileTopicWithTime() throws Exception {
+ testTopicWithTimeTemplate(TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+ }
+
+ private void testTopicWithTimeTemplate(final String topicFormat) throws
Exception {
// Insert some historical data on sender
final long currentTime = System.currentTimeMillis();
try (final ISession session = senderEnv.getSessionConnection()) {
@@ -208,6 +223,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
session.open();
final Properties config = new Properties();
+ config.put(TopicConstant.FORMAT_KEY, topicFormat);
config.put(TopicConstant.START_TIME_KEY, currentTime);
session.createTopic(topicName, config);
} catch (final Exception e) {
@@ -236,14 +252,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS);
// wait some time
final List<SubscriptionMessage> messages =
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
- for (final SubscriptionMessage message : messages) {
- for (final Iterator<Tablet> it =
-
message.getSessionDataSetsHandler().tabletIterator();
- it.hasNext(); ) {
- final Tablet tablet = it.next();
- session.insertTablet(tablet);
- }
- }
+ insertData(messages, session);
consumer.commitSync(messages);
}
consumer.unsubscribe(topicName);
@@ -282,7 +291,16 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
@Test
- public void testTopicProcessorSubscription() throws Exception {
+ public void testTabletTopicWithProcessor() throws Exception {
+
testTopicWithProcessorTemplate(TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE);
+ }
+
+ @Test
+ public void testTsFileTopicWithProcessor() throws Exception {
+ testTopicWithProcessorTemplate(TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+ }
+
+ private void testTopicWithProcessorTemplate(final String topicFormat) throws
Exception {
// Insert some history data on sender
try (final ISession session = senderEnv.getSessionConnection()) {
session.executeNonQueryStatement(
@@ -300,6 +318,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
session.open();
final Properties config = new Properties();
+ config.put(TopicConstant.FORMAT_KEY, topicFormat);
config.put("processor", "tumbling-time-sampling-processor");
config.put("processor.tumbling-time.interval-seconds", "1");
config.put("processor.down-sampling.split-file", "true");
@@ -330,14 +349,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS);
// wait some time
final List<SubscriptionMessage> messages =
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
- for (final SubscriptionMessage message : messages) {
- for (final Iterator<Tablet> it =
-
message.getSessionDataSetsHandler().tabletIterator();
- it.hasNext(); ) {
- final Tablet tablet = it.next();
- session.insertTablet(tablet);
- }
- }
+ insertData(messages, session);
consumer.commitSync(messages);
}
consumer.unsubscribe(topicName);
@@ -500,7 +512,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
@Test
- public void testTopicInvalidTimeRangeConfig() throws Exception {
+ public void testTopicWithInvalidTimeConfig() throws Exception {
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
@@ -530,7 +542,16 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
@Test
- public void testTopicWithSnapshotMode() throws Exception {
+ public void testTabletTopicWithSnapshotMode() throws Exception {
+
testTopicWithSnapshotModeTemplate(TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE);
+ }
+
+ @Test
+ public void testTsFileTopicWithSnapshotMode() throws Exception {
+
testTopicWithSnapshotModeTemplate(TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+ }
+
+ private void testTopicWithSnapshotModeTemplate(final String topicFormat)
throws Exception {
// Insert some historical data
try (final ISession session = senderEnv.getSessionConnection()) {
for (int i = 0; i < 100; ++i) {
@@ -544,13 +565,13 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
// Create topic
- final String topicName = "topic11";
+ final String topicName = "topic9";
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
session.open();
final Properties config = new Properties();
- config.put(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+ config.put(TopicConstant.FORMAT_KEY, topicFormat);
config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_SNAPSHOT_VALUE);
session.createTopic(topicName, config);
} catch (final Exception e) {
@@ -580,16 +601,37 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
final List<SubscriptionMessage> messages =
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
for (final SubscriptionMessage message : messages) {
- try (final TsFileReader tsFileReader =
- message.getTsFileHandler().openReader()) {
- final Path path = new Path("root.db.d1", "s1", true);
- final QueryDataSet dataSet =
- tsFileReader.query(
-
QueryExpression.create(Collections.singletonList(path), null));
- while (dataSet.hasNext()) {
- dataSet.next();
- rowCount.addAndGet(1);
- }
+ final short messageType = message.getMessageType();
+ if
(!SubscriptionMessageType.isValidatedMessageType(messageType)) {
+ LOGGER.warn("unexpected message type: {}", messageType);
+ continue;
+ }
+ switch (SubscriptionMessageType.valueOf(messageType)) {
+ case SESSION_DATA_SETS_HANDLER:
+ for (final SubscriptionSessionDataSet dataSet :
+ message.getSessionDataSetsHandler()) {
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ rowCount.addAndGet(1);
+ }
+ }
+ break;
+ case TS_FILE_HANDLER:
+ try (final TsFileReader tsFileReader =
+ message.getTsFileHandler().openReader()) {
+ final Path path = new Path("root.db.d1", "s1", true);
+ final QueryDataSet dataSet =
+ tsFileReader.query(
+
QueryExpression.create(Collections.singletonList(path), null));
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ rowCount.addAndGet(1);
+ }
+ }
+ break;
+ default:
+ LOGGER.warn("unexpected message type: {}",
messageType);
+ break;
}
}
consumer.commitSync(messages);
@@ -633,7 +675,16 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
@Test
- public void testTopicWithLooseRange() throws Exception {
+ public void testTabletTopicWithLooseRange() throws Exception {
+
testTopicWithLooseRangeTemplate(TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE);
+ }
+
+ @Test
+ public void testTsFileTopicWithLooseRange() throws Exception {
+
testTopicWithLooseRangeTemplate(TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+ }
+
+ private void testTopicWithLooseRangeTemplate(final String topicFormat)
throws Exception {
// Insert some historical data on sender
try (final ISession session = senderEnv.getSessionConnection()) {
session.executeNonQueryStatement(
@@ -649,12 +700,13 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
// Create topic
- final String topicName = "topic12";
+ final String topicName = "topic10";
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
session.open();
final Properties config = new Properties();
+ config.put(TopicConstant.FORMAT_KEY, topicFormat);
config.put(TopicConstant.LOOSE_RANGE_KEY,
TopicConstant.LOOSE_RANGE_ALL_VALUE);
config.put(TopicConstant.PATH_KEY, "root.db.d1.at1");
config.put(TopicConstant.START_TIME_KEY, "1500");
@@ -692,14 +744,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
if (dataPrepared.get()) {
final List<SubscriptionMessage> messages =
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
- for (final SubscriptionMessage message : messages) {
- for (final Iterator<Tablet> it =
-
message.getSessionDataSetsHandler().tabletIterator();
- it.hasNext(); ) {
- final Tablet tablet = it.next();
- session.insertTablet(tablet);
- }
- }
+ insertData(messages, session);
consumer.commitSync(messages);
}
}
@@ -762,6 +807,8 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
}
+ /////////////////////////////// utility ///////////////////////////////
+
private void assertTopicCount(final int count) throws Exception {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
@@ -770,4 +817,32 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
Assert.assertEquals(count, showTopicResult.size());
}
}
+
+ private void insertData(final List<SubscriptionMessage> messages, final
ISession session)
+ throws Exception {
+ for (final SubscriptionMessage message : messages) {
+ final short messageType = message.getMessageType();
+ if (!SubscriptionMessageType.isValidatedMessageType(messageType)) {
+ LOGGER.warn("unexpected message type: {}", messageType);
+ continue;
+ }
+ switch (SubscriptionMessageType.valueOf(messageType)) {
+ case SESSION_DATA_SETS_HANDLER:
+ for (final Iterator<Tablet> it =
message.getSessionDataSetsHandler().tabletIterator();
+ it.hasNext(); ) {
+ final Tablet tablet = it.next();
+ session.insertTablet(tablet);
+ }
+ break;
+ case TS_FILE_HANDLER:
+ final SubscriptionTsFileHandler tsFileHandler =
message.getTsFileHandler();
+ session.executeNonQueryStatement(
+ String.format("load '%s'",
tsFileHandler.getFile().getAbsolutePath()));
+ break;
+ default:
+ LOGGER.warn("unexpected message type: {}", messageType);
+ break;
+ }
+ }
+ }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index e9c97c06896..83c5c9d1c85 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -30,21 +30,10 @@ import
org.apache.iotdb.session.subscription.consumer.AsyncCommitCallback;
import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
-import org.apache.iotdb.session.subscription.payload.SubscriptionFileHandler;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
-import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
-import org.apache.tsfile.common.conf.TSFileConfig;
-import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.read.TsFileReader;
-import org.apache.tsfile.read.common.Path;
-import org.apache.tsfile.read.common.RowRecord;
-import org.apache.tsfile.read.expression.QueryExpression;
-import org.apache.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.tsfile.utils.Binary;
-import org.apache.tsfile.write.record.Tablet;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -52,12 +41,7 @@ import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
import java.time.Duration;
-import java.time.LocalDate;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -75,282 +59,6 @@ public class IoTDBSubscriptionBasicIT extends
AbstractSubscriptionLocalIT {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSubscriptionBasicIT.class);
- @Test
- public void testSubscribeBooleanData() throws Exception {
- testBasicSubscribeTablets(TSDataType.BOOLEAN, "true", true);
- }
-
- @Test
- public void testSubscribeIntData() throws Exception {
- testBasicSubscribeTablets(TSDataType.INT32, "1", 1);
- }
-
- @Test
- public void testSubscribeLongData() throws Exception {
- testBasicSubscribeTablets(TSDataType.INT64, "1", 1L);
- }
-
- @Test
- public void testSubscribeFloatData() throws Exception {
- testBasicSubscribeTablets(TSDataType.FLOAT, "1.0", 1.0F);
- }
-
- @Test
- public void testSubscribeDoubleData() throws Exception {
- testBasicSubscribeTablets(TSDataType.DOUBLE, "1.0", 1.0);
- }
-
- @Test
- public void testSubscribeTextData() throws Exception {
- testBasicSubscribeTablets(TSDataType.TEXT, "'a'", new Binary("a",
TSFileConfig.STRING_CHARSET));
- }
-
- @Test
- public void testSubscribeTimestampData() throws Exception {
- testBasicSubscribeTablets(TSDataType.TIMESTAMP, "123", 123L);
- }
-
- @Test
- public void testSubscribeDateData() throws Exception {
- testBasicSubscribeTablets(TSDataType.DATE, "'2011-03-01'",
LocalDate.of(2011, 3, 1));
- }
-
- @Test
- public void testSubscribeBlobData() throws Exception {
- testBasicSubscribeTablets(
- TSDataType.BLOB, "X'f013'", new Binary(new byte[] {(byte) 0xf0,
0x13}));
- }
-
- @Test
- public void testSubscribeStringData() throws Exception {
- testBasicSubscribeTablets(
- TSDataType.STRING, "'a'", new Binary("a",
TSFileConfig.STRING_CHARSET));
- }
-
- private void testBasicSubscribeTablets(
- final TSDataType type, final String valueStr, final Object expectedData)
throws Exception {
- // Insert some historical data
- try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
- session.executeNonQueryStatement(
- String.format("create timeseries root.db.d1.s1 %s",
type.toString()));
- for (int i = 0; i < 100; ++i) {
- session.executeNonQueryStatement(
- String.format("insert into root.db.d1(time, s1) values (%s, %s)",
i, valueStr));
- }
- session.executeNonQueryStatement("flush");
- } catch (final Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- // Create topic
- final String topicName = "topic1";
- final String host = EnvFactory.getEnv().getIP();
- final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
- try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
- session.open();
- session.createTopic(topicName);
- } catch (final Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- // Subscription
- final AtomicInteger rowCount = new AtomicInteger();
- final AtomicBoolean isClosed = new AtomicBoolean(false);
- final Thread thread =
- new Thread(
- () -> {
- try (final SubscriptionPullConsumer consumer =
- new SubscriptionPullConsumer.Builder()
- .host(host)
- .port(port)
- .consumerId("c1")
- .consumerGroupId("cg1")
- .autoCommit(false)
- .buildPullConsumer()) {
- consumer.open();
- consumer.subscribe(topicName);
- while (!isClosed.get()) {
- LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS);
// wait some time
- final List<SubscriptionMessage> messages =
-
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
- for (final SubscriptionMessage message : messages) {
- for (final SubscriptionSessionDataSet dataSet :
- message.getSessionDataSetsHandler()) {
- while (dataSet.hasNext()) {
- final RowRecord record = dataSet.next();
- Assert.assertEquals(type.toString(),
dataSet.getColumnTypes().get(1));
- Assert.assertEquals(type,
record.getFields().get(0).getDataType());
- Assert.assertEquals(expectedData, getValue(type,
dataSet.getTablet()));
- Assert.assertEquals(
- expectedData,
record.getFields().get(0).getObjectValue(type));
- rowCount.addAndGet(1);
- }
- }
- }
- consumer.commitSync(messages);
- }
- consumer.unsubscribe(topicName);
- } catch (final Exception e) {
- e.printStackTrace();
- // Avoid failure
- } finally {
- LOGGER.info("consumer exiting...");
- }
- },
- String.format("%s - consumer", testName.getMethodName()));
- thread.start();
-
- // Check row count
- try {
- // Keep retrying if there are execution failures
- AWAIT.untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
- } catch (final Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- } finally {
- isClosed.set(true);
- thread.join();
- }
- }
-
- private Object getValue(final TSDataType type, final Tablet tablet) {
- switch (type) {
- case BOOLEAN:
- return ((boolean[]) tablet.values[0])[0];
- case INT32:
- return ((int[]) tablet.values[0])[0];
- case INT64:
- case TIMESTAMP:
- return ((long[]) tablet.values[0])[0];
- case FLOAT:
- return ((float[]) tablet.values[0])[0];
- case DOUBLE:
- return ((double[]) tablet.values[0])[0];
- case TEXT:
- case BLOB:
- case STRING:
- return ((Binary[]) tablet.values[0])[0];
- case DATE:
- return ((LocalDate[]) tablet.values[0])[0];
- default:
- return null;
- }
- }
-
- @Test
- public void testBasicSubscribeTsFile() throws Exception {
- // Insert some historical data
- try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
- for (int i = 0; i < 100; ++i) {
- session.executeNonQueryStatement(
- String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
- }
- session.executeNonQueryStatement("flush");
- } catch (final Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- // Create topic
- final String topicName = "topic1";
- final String host = EnvFactory.getEnv().getIP();
- final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
- try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
- session.open();
- final Properties config = new Properties();
- config.put(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
- session.createTopic(topicName, config);
- } catch (final Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- // Record file handlers
- final List<SubscriptionFileHandler> fileHandlers = new ArrayList<>();
-
- // Subscription
- final AtomicInteger rowCount = new AtomicInteger();
- final AtomicBoolean isClosed = new AtomicBoolean(false);
- final Thread thread =
- new Thread(
- () -> {
- try (final SubscriptionPullConsumer consumer =
- new SubscriptionPullConsumer.Builder()
- .host(host)
- .port(port)
- .consumerId("c1")
- .consumerGroupId("cg1")
- .autoCommit(false)
- .fileSaveDir(System.getProperty("java.io.tmpdir")) //
hack for license check
- .buildPullConsumer()) {
- consumer.open();
- consumer.subscribe(topicName);
- while (!isClosed.get()) {
- LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS);
// wait some time
- final List<SubscriptionMessage> messages =
-
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
- for (final SubscriptionMessage message : messages) {
- final SubscriptionTsFileHandler tsFileHandler =
message.getTsFileHandler();
- fileHandlers.add(tsFileHandler);
- try (final TsFileReader tsFileReader =
tsFileHandler.openReader()) {
- final Path path = new Path("root.db.d1", "s1", true);
- final QueryDataSet dataSet =
- tsFileReader.query(
-
QueryExpression.create(Collections.singletonList(path), null));
- while (dataSet.hasNext()) {
- dataSet.next();
- rowCount.addAndGet(1);
- }
- }
- }
- consumer.commitSync(messages);
- }
- consumer.unsubscribe(topicName);
- } catch (final Exception e) {
- e.printStackTrace();
- // Avoid failure
- } finally {
- LOGGER.info("consumer exiting...");
- }
- },
- String.format("%s - consumer", testName.getMethodName()));
- thread.start();
-
- // Check row count
- try {
- // Keep retrying if there are execution failures
- AWAIT.untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
- } catch (final Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- } finally {
- isClosed.set(true);
- thread.join();
- }
-
- // Do something for file handlers
- Assert.assertFalse(fileHandlers.isEmpty());
- final SubscriptionFileHandler fileHandler = fileHandlers.get(0);
- final java.nio.file.Path filePath = fileHandler.getPath();
- Assert.assertTrue(Files.exists(filePath));
-
- // Copy file
- java.nio.file.Path tmpFilePath;
- tmpFilePath = fileHandler.copyFile(Files.createTempFile(null,
null).toAbsolutePath());
- Assert.assertTrue(Files.exists(filePath));
- Assert.assertTrue(Files.exists(tmpFilePath));
-
- // Move file
- tmpFilePath = fileHandler.moveFile(Files.createTempFile(null,
null).toAbsolutePath());
- Assert.assertFalse(Files.exists(filePath));
- Assert.assertTrue(Files.exists(tmpFilePath));
-
- // Delete file
- Assert.assertThrows(NoSuchFileException.class, fileHandler::deleteFile);
- }
-
@Test
public void testBasicPullConsumerWithCommitAsync() throws Exception {
// Insert some historical data
@@ -511,7 +219,7 @@ public class IoTDBSubscriptionBasicIT extends
AbstractSubscriptionLocalIT {
}
// Create topic
- final String topicName = "topic1";
+ final String topicName = "topic2";
final String host = EnvFactory.getEnv().getIP();
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
@@ -610,6 +318,8 @@ public class IoTDBSubscriptionBasicIT extends
AbstractSubscriptionLocalIT {
}
// Create topic
+ final String topicName3 = "topic3";
+ final String topicName4 = "topic4";
final String host = EnvFactory.getEnv().getIP();
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
@@ -617,12 +327,12 @@ public class IoTDBSubscriptionBasicIT extends
AbstractSubscriptionLocalIT {
{
final Properties properties = new Properties();
properties.put(TopicConstant.END_TIME_KEY, 99);
- session.createTopic("topic1", properties);
+ session.createTopic(topicName3, properties);
}
{
final Properties properties = new Properties();
properties.put(TopicConstant.START_TIME_KEY, 100);
- session.createTopic("topic2", properties);
+ session.createTopic(topicName4, properties);
}
} catch (final Exception e) {
e.printStackTrace();
@@ -645,7 +355,7 @@ public class IoTDBSubscriptionBasicIT extends
AbstractSubscriptionLocalIT {
.autoCommit(false)
.buildPullConsumer()) {
consumer.open();
- consumer.subscribe("topic2"); // only subscribe topic2
+ consumer.subscribe(topicName4); // only subscribe topic4
while (!isClosed.get()) {
LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS);
// wait some time
final List<SubscriptionMessage> messages =
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionDataTypeIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionDataTypeIT.java
new file mode 100644
index 00000000000..413540a89c1
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionDataTypeIT.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.local;
+
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
+import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
+import
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.TsFileReader;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.read.common.RowRecord;
+import org.apache.tsfile.read.expression.QueryExpression;
+import org.apache.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.record.Tablet;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDate;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
+
+import static
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class})
+public class IoTDBSubscriptionDataTypeIT extends AbstractSubscriptionLocalIT {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSubscriptionDataTypeIT.class);
+
+ // ----------------------------- //
+ // SessionDataSetsHandler format //
+ // ----------------------------- //
+
+ @Test
+ public void testSubscribeTabletBooleanData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE,
TSDataType.BOOLEAN, "true", true);
+ }
+
+ @Test
+ public void testSubscribeTabletIntData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE,
TSDataType.INT32, "1", 1);
+ }
+
+ @Test
+ public void testSubscribeTabletLongData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE,
TSDataType.INT64, "1", 1L);
+ }
+
+ @Test
+ public void testSubscribeTabletFloatData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE,
TSDataType.FLOAT, "1.0", 1.0F);
+ }
+
+ @Test
+ public void testSubscribeTabletDoubleData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE,
TSDataType.DOUBLE, "1.0", 1.0);
+ }
+
+ @Test
+ public void testSubscribeTabletTextData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE,
+ TSDataType.TEXT,
+ "'a'",
+ new Binary("a", TSFileConfig.STRING_CHARSET));
+ }
+
+ @Test
+ public void testSubscribeTabletTimestampData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE,
TSDataType.TIMESTAMP, "123", 123L);
+ }
+
+ @Test
+ public void testSubscribeTabletDateData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE,
+ TSDataType.DATE,
+ "'2011-03-01'",
+ LocalDate.of(2011, 3, 1));
+ }
+
+ @Test
+ public void testSubscribeBlobData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE,
+ TSDataType.BLOB,
+ "X'f013'",
+ new Binary(new byte[] {(byte) 0xf0, 0x13}));
+ }
+
+ @Test
+ public void testSubscribeStringData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE,
+ TSDataType.STRING,
+ "'a'",
+ new Binary("a", TSFileConfig.STRING_CHARSET));
+ }
+
+ // -------------------- //
+ // TsFileHandler format //
+ // -------------------- //
+
+ @Test
+ public void testSubscribeTsFileBooleanData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE, TSDataType.BOOLEAN,
"true", true);
+ }
+
+ @Test
+ public void testSubscribeTsFileIntData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE, TSDataType.INT32, "1", 1);
+ }
+
+ @Test
+ public void testSubscribeTsFileLongData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE, TSDataType.INT64, "1", 1L);
+ }
+
+ @Test
+ public void testSubscribeTsFileFloatData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE, TSDataType.FLOAT, "1.0",
1.0F);
+ }
+
+ @Test
+ public void testSubscribeTsFileDoubleData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE, TSDataType.DOUBLE, "1.0",
1.0);
+ }
+
+ @Test
+ public void testSubscribeTsFileTextData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE,
+ TSDataType.TEXT,
+ "'a'",
+ new Binary("a", TSFileConfig.STRING_CHARSET));
+ }
+
+ @Test
+ public void testSubscribeTsFileTimestampData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE, TSDataType.TIMESTAMP,
"123", 123L);
+ }
+
+ @Test
+ public void testSubscribeTsFileDateData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE,
+ TSDataType.DATE,
+ "'2011-03-01'",
+ LocalDate.of(2011, 3, 1));
+ }
+
+ @Test
+ public void testSubscribeTsFileBlobData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE,
+ TSDataType.BLOB,
+ "X'f013'",
+ new Binary(new byte[] {(byte) 0xf0, 0x13}));
+ }
+
+ @Test
+ public void testSubscribeTsFileStringData() throws Exception {
+ testPullConsumerSubscribeDataTemplate(
+ TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE,
+ TSDataType.STRING,
+ "'a'",
+ new Binary("a", TSFileConfig.STRING_CHARSET));
+ }
+
+ /////////////////////////////// utility ///////////////////////////////
+
+ private void testPullConsumerSubscribeDataTemplate(
+ final String topicFormat,
+ final TSDataType type,
+ final String valueStr,
+ final Object expectedData)
+ throws Exception {
+ // Insert some historical data
+ try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ session.executeNonQueryStatement(
+ String.format("create timeseries root.db.d1.s1 %s",
type.toString()));
+ for (int i = 0; i < 100; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s1) values (%s, %s)",
i, valueStr));
+ }
+ session.executeNonQueryStatement("flush");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Create topic
+ final String topicName = "topic1";
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ final Properties config = new Properties();
+ config.put(TopicConstant.FORMAT_KEY, topicFormat);
+ session.createTopic(topicName, config);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Subscription
+ final AtomicInteger rowCount = new AtomicInteger();
+ final AtomicBoolean isClosed = new AtomicBoolean(false);
+ final Thread thread =
+ new Thread(
+ () -> {
+ try (final SubscriptionPullConsumer consumer =
+ new SubscriptionPullConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .fileSaveDir(System.getProperty("java.io.tmpdir")) //
hack for license check
+ .autoCommit(false)
+ .buildPullConsumer()) {
+ consumer.open();
+ consumer.subscribe(topicName);
+ while (!isClosed.get()) {
+ LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS);
// wait some time
+ final List<SubscriptionMessage> messages =
+
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
+ for (final SubscriptionMessage message : messages) {
+ final short messageType = message.getMessageType();
+ if
(!SubscriptionMessageType.isValidatedMessageType(messageType)) {
+ LOGGER.warn("unexpected message type: {}", messageType);
+ continue;
+ }
+ switch (SubscriptionMessageType.valueOf(messageType)) {
+ case SESSION_DATA_SETS_HANDLER:
+ for (final SubscriptionSessionDataSet dataSet :
+ message.getSessionDataSetsHandler()) {
+ while (dataSet.hasNext()) {
+ final RowRecord record = dataSet.next();
+ Assert.assertEquals(type.toString(),
dataSet.getColumnTypes().get(1));
+ Assert.assertEquals(type,
record.getFields().get(0).getDataType());
+ Assert.assertEquals(expectedData, getValue(type,
dataSet.getTablet()));
+ Assert.assertEquals(
+ expectedData,
record.getFields().get(0).getObjectValue(type));
+ rowCount.addAndGet(1);
+ }
+ }
+ break;
+ case TS_FILE_HANDLER:
+ try (final TsFileReader tsFileReader =
+ message.getTsFileHandler().openReader()) {
+ final QueryDataSet dataSet =
+ tsFileReader.query(
+ QueryExpression.create(
+ Collections.singletonList(new
Path("root.db.d1", "s1", true)),
+ null));
+ while (dataSet.hasNext()) {
+ final RowRecord record = dataSet.next();
+ Assert.assertEquals(type,
record.getFields().get(0).getDataType());
+ Assert.assertEquals(
+ expectedData,
record.getFields().get(0).getObjectValue(type));
+ rowCount.addAndGet(1);
+ }
+ }
+ break;
+ default:
+ LOGGER.warn("unexpected message type: {}",
messageType);
+ break;
+ }
+ }
+ consumer.commitSync(messages);
+ }
+ consumer.unsubscribe(topicName);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ // Avoid failure
+ } finally {
+ LOGGER.info("consumer exiting...");
+ }
+ },
+ String.format("%s - consumer", testName.getMethodName()));
+ thread.start();
+
+ // Check row count
+ try {
+ // Keep retrying if there are execution failures
+ AWAIT.untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ isClosed.set(true);
+ thread.join();
+ }
+ }
+
+ private Object getValue(final TSDataType type, final Tablet tablet) {
+ switch (type) {
+ case BOOLEAN:
+ return ((boolean[]) tablet.values[0])[0];
+ case INT32:
+ return ((int[]) tablet.values[0])[0];
+ case INT64:
+ case TIMESTAMP:
+ return ((long[]) tablet.values[0])[0];
+ case FLOAT:
+ return ((float[]) tablet.values[0])[0];
+ case DOUBLE:
+ return ((double[]) tablet.values[0])[0];
+ case TEXT:
+ case BLOB:
+ case STRING:
+ return ((Binary[]) tablet.values[0])[0];
+ case DATE:
+ return ((LocalDate[]) tablet.values[0])[0];
+ default:
+ return null;
+ }
+ }
+}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index d54ed406774..1dad01aca00 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -41,6 +41,7 @@ import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
import org.apache.iotdb.session.subscription.util.IdentifierUtils;
import org.apache.iotdb.session.subscription.util.RandomStringGenerator;
import org.apache.iotdb.session.subscription.util.SubscriptionPollTimer;
@@ -209,6 +210,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
public synchronized void open() throws SubscriptionException {
checkIfHasBeenClosed();
+
if (!isClosed.get()) {
return;
}
@@ -495,7 +497,11 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
final File file = filePath.toFile();
try (final RandomAccessFile fileWriter = new RandomAccessFile(file, "rw"))
{
return Optional.of(pollFileInternal(commitContext, file, fileWriter));
- } catch (final IOException e) {
+ } catch (final Exception e) {
+ // construct temporary message to nack
+ nack(
+ Collections.singletonList(
+ new SubscriptionMessage(commitContext, file.getAbsolutePath())));
throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
}
}
@@ -740,6 +746,14 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
final Map<Integer, List<SubscriptionCommitContext>>
dataNodeIdToSubscriptionCommitContexts =
new HashMap<>();
for (final SubscriptionMessage message : messages) {
+ // make every effort to delete stale intermediate file
+ if (Objects.equals(
+ SubscriptionMessageType.TS_FILE_HANDLER.getType(),
message.getMessageType())) {
+ try {
+ message.getTsFileHandler().deleteFile();
+ } catch (final Exception ignored) {
+ }
+ }
dataNodeIdToSubscriptionCommitContexts
.computeIfAbsent(message.getCommitContext().getDataNodeId(), (id) ->
new ArrayList<>())
.add(message.getCommitContext());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index 21064ac34c8..f47f7cba0b2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -94,7 +94,8 @@ public class SubscriptionBrokerAgent {
LOGGER.warn(errorMessage);
throw new SubscriptionException(errorMessage);
}
- return broker.commit(commitContexts, nack);
+ final String consumerId = consumerConfig.getConsumerId();
+ return broker.commit(consumerId, commitContexts, nack);
}
/////////////////////////////// broker ///////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index b4b95895840..45dbeda8d5e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -122,7 +122,9 @@ public class SubscriptionBroker {
* @return list of successful commit contexts
*/
public List<SubscriptionCommitContext> commit(
- final List<SubscriptionCommitContext> commitContexts, final boolean
nack) {
+ final String consumerId,
+ final List<SubscriptionCommitContext> commitContexts,
+ final boolean nack) {
final List<SubscriptionCommitContext> successfulCommitContexts = new
ArrayList<>();
for (final SubscriptionCommitContext commitContext : commitContexts) {
final String topicName = commitContext.getTopicName();
@@ -138,11 +140,11 @@ public class SubscriptionBroker {
continue;
}
if (!nack) {
- if (prefetchingQueue.ack(commitContext)) {
+ if (prefetchingQueue.ack(consumerId, commitContext)) {
successfulCommitContexts.add(commitContext);
}
} else {
- if (prefetchingQueue.nack(commitContext)) {
+ if (prefetchingQueue.nack(consumerId, commitContext)) {
successfulCommitContexts.add(commitContext);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index df06665f162..e4b1c0d8711 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -77,6 +77,21 @@ public abstract class SubscriptionPrefetchingQueue {
this.uncommittedEvents = new ConcurrentHashMap<>();
}
+ public void cleanup() {
+ // clean up uncommitted events
+ uncommittedEvents.values().forEach(SubscriptionEvent::cleanup);
+ uncommittedEvents.clear();
+
+ // no need to clean up events in prefetchingQueue, since all events in
prefetchingQueue are also
+ // in uncommittedEvents
+ prefetchingQueue.clear();
+
+ // no need to clean up events in inputPendingQueue, see
+ //
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.close
+ }
+
+ /////////////////////////////// poll ///////////////////////////////
+
public SubscriptionEvent poll(final String consumerId) {
if (prefetchingQueue.isEmpty()) {
tryPrefetch();
@@ -121,23 +136,12 @@ public abstract class SubscriptionPrefetchingQueue {
return null;
}
- /**
- * @return {@code true} if a new event has been prefetched.
- */
- protected abstract boolean onEvent(final TabletInsertionEvent event);
-
- /**
- * @return {@code true} if a new event has been prefetched.
- */
- protected abstract boolean onEvent(final PipeTsFileInsertionEvent event);
+ /////////////////////////////// prefetch ///////////////////////////////
- /**
- * @return {@code true} if a new event has been prefetched.
- */
- protected abstract boolean trySealBatch();
+ public abstract void executePrefetch();
/**
- * prefetch at most one subscription event from {@link
+ * prefetch at most one {@link SubscriptionEvent} from {@link
* SubscriptionPrefetchingQueue#inputPendingQueue} to {@link
* SubscriptionPrefetchingQueue#prefetchingQueue}
*/
@@ -188,27 +192,27 @@ public abstract class SubscriptionPrefetchingQueue {
}
}
- public abstract void executePrefetch();
-
- public void cleanup() {
- // clean up uncommitted events
- uncommittedEvents.values().forEach(SubscriptionEvent::cleanup);
- uncommittedEvents.clear();
+ /**
+ * @return {@code true} if a new event has been prefetched.
+ */
+ protected abstract boolean onEvent(final TabletInsertionEvent event);
- // no need to clean up events in prefetchingQueue, since all events in
prefetchingQueue are also
- // in uncommittedEvents
- prefetchingQueue.clear();
+ /**
+ * @return {@code true} if a new event has been prefetched.
+ */
+ protected abstract boolean onEvent(final PipeTsFileInsertionEvent event);
- // no need to clean up events in inputPendingQueue, see
- //
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.close
- }
+ /**
+ * @return {@code true} if a new event has been prefetched.
+ */
+ protected abstract boolean trySealBatch();
/////////////////////////////// commit ///////////////////////////////
/**
* @return {@code true} if ack successfully
*/
- public boolean ack(final SubscriptionCommitContext commitContext) {
+ public boolean ack(final String consumerId, final SubscriptionCommitContext
commitContext) {
final SubscriptionEvent event = uncommittedEvents.get(commitContext);
if (Objects.isNull(event)) {
LOGGER.warn(
@@ -236,9 +240,20 @@ public abstract class SubscriptionPrefetchingQueue {
return false;
}
+ // check if a consumer acks event from another consumer group...
+ final String consumerGroupId = commitContext.getConsumerGroupId();
+ if (!Objects.equals(consumerGroupId, brokerId)) {
+ LOGGER.warn(
+ "inconsistent consumer group when acking event, current: {},
incoming: {}, consumer id: {}, event commit context: {}, prefetching queue: {},
commit it anyway...",
+ brokerId,
+ consumerGroupId,
+ consumerId,
+ commitContext,
+ this);
+ }
+
event.ack();
event.cleanup();
-
event.recordCommittedTimestamp();
uncommittedEvents.remove(commitContext);
return true;
@@ -247,7 +262,7 @@ public abstract class SubscriptionPrefetchingQueue {
/**
* @return {@code true} if nack successfully
*/
- public boolean nack(final SubscriptionCommitContext commitContext) {
+ public boolean nack(final String consumerId, final SubscriptionCommitContext
commitContext) {
final SubscriptionEvent event = uncommittedEvents.get(commitContext);
if (Objects.isNull(event)) {
LOGGER.warn(
@@ -256,6 +271,19 @@ public abstract class SubscriptionPrefetchingQueue {
this);
return false;
}
+
+ // check if a consumer nacks event from another consumer group...
+ final String consumerGroupId = commitContext.getConsumerGroupId();
+ if (!Objects.equals(consumerGroupId, brokerId)) {
+ LOGGER.warn(
+ "inconsistent consumer group when nacking event, current: {},
incoming: {}, consumer id: {}, event commit context: {}, prefetching queue: {},
commit it anyway...",
+ brokerId,
+ consumerGroupId,
+ consumerId,
+ commitContext,
+ this);
+ }
+
event.nack();
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index 7bd42904d15..900bc4577fc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -81,10 +81,12 @@ public class SubscriptionPrefetchingTabletQueue extends
SubscriptionPrefetchingQ
});
}
+ /////////////////////////////// prefetch ///////////////////////////////
+
@Override
public void executePrefetch() {
- tryPrefetch();
- serializeEventsInQueue();
+ super.tryPrefetch();
+ this.serializeEventsInQueue();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 43aab7f8189..40c8980d44f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -93,6 +93,8 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
});
}
+ /////////////////////////////// poll ///////////////////////////////
+
@Override
public SubscriptionEvent poll(final String consumerId) {
if (hasUnPollableOnTheFlySubscriptionTsFileEvent(consumerId)) {
@@ -257,6 +259,22 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
return event;
}
+ /////////////////////////////// prefetch ///////////////////////////////
+
+ @Override
+ public synchronized void executePrefetch() {
+ super.tryPrefetch();
+
+ // prefetch remaining subscription events based on {@link
consumerIdToSubscriptionEventMap}
+ for (final SubscriptionEvent event :
consumerIdToSubscriptionEventMap.values()) {
+ try {
+ event.prefetchRemainingResponses();
+ event.trySerializeRemainingResponses();
+ } catch (final IOException ignored) {
+ }
+ }
+ }
+
@Override
protected boolean onEvent(final TabletInsertionEvent event) {
final AtomicBoolean result = new AtomicBoolean(false);
@@ -337,18 +355,21 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
}
}
- @Override
- public synchronized void executePrefetch() {
- tryPrefetch();
+ /////////////////////////////// commit ///////////////////////////////
- // prefetch remaining subscription events based on {@link
consumerIdToSubscriptionEventMap}
- for (final SubscriptionEvent event :
consumerIdToSubscriptionEventMap.values()) {
- try {
- event.prefetchRemainingResponses();
- event.trySerializeRemainingResponses();
- } catch (final IOException ignored) {
+ /**
+ * @return {@code true} if nack successfully
+ */
+ @Override
+ public boolean nack(final String consumerId, final SubscriptionCommitContext
commitContext) {
+ if (super.nack(consumerId, commitContext)) {
+ final SubscriptionEvent event =
consumerIdToSubscriptionEventMap.get(consumerId);
+ if (Objects.nonNull(event) && Objects.equals(commitContext,
event.getCommitContext())) {
+ consumerIdToSubscriptionEventMap.remove(consumerId);
}
+ return true;
}
+ return false;
}
/////////////////////////////// utility ///////////////////////////////
@@ -397,7 +418,7 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
consumerIdToSubscriptionEventMap.remove(entry.getKey());
- event.resetCurrentResponseIndex();
+ event.nack();
consumerIdToSubscriptionEventMap.put(consumerId, event);
event.recordLastPolledConsumerId(consumerId);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
index c4cfac361c7..b8926a80316 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
@@ -110,8 +110,8 @@ public class SubscriptionEvent {
return responses[index];
}
- public void resetCurrentResponseIndex() {
- currentResponseIndex = 0;
+ public SubscriptionCommitContext getCommitContext() {
+ return commitContext;
}
//////////////////////////// commit ////////////////////////////
@@ -169,6 +169,9 @@ public class SubscriptionEvent {
}
public void nack() {
+ // reset current response index
+ currentResponseIndex = 0;
+
lastPolledConsumerId = null;
lastPolledTimestamp = INVALID_TIMESTAMP;
}