This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 96f8c19a80b Subscription: fix NPE when building consumer with missing
id & add more checker for topic operations using session (#14493)
96f8c19a80b is described below
commit 96f8c19a80b4567d8d78fe364bba4d0a486e995b
Author: VGalaxies <[email protected]>
AuthorDate: Thu Dec 19 17:39:19 2024 +0800
Subscription: fix NPE when building consumer with missing id & add more
checker for topic operations using session (#14493)
---
.../it/local/IoTDBSubscriptionBasicIT.java | 69 ++++++++++++++++++++++
.../param/IoTDBTestParamPullConsumerIT.java | 8 ++-
.../param/IoTDBTestParamPushConsumerIT.java | 8 ++-
.../regression/param/IoTDBTestParamTopicIT.java | 11 ++--
.../session/subscription/SubscriptionSession.java | 9 +++
.../consumer/SubscriptionConsumer.java | 25 +++++---
.../consumer/SubscriptionPullConsumer.java | 4 +-
.../session/subscription/util/IdentifierUtils.java | 10 +++-
8 files changed, 124 insertions(+), 20 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index 4fed09711c2..2c937de6aa0 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -552,4 +552,73 @@ public class IoTDBSubscriptionBasicIT extends
AbstractSubscriptionLocalIT {
fail(e.getMessage());
}
}
+
+ // same to
+ //
org.apache.iotdb.subscription.it.local.IoTDBSubscriptionBasicIT.testDataSetDeduplication,
+ // but missing consumer id & consumer group id when building consumer
+ @Test
+ public void testMissingConsumerId() {
+ // Insert some historical data
+ try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ session.createDatabase("root.db");
+ for (int i = 0; i < 100; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s1, s2) values (%s, 1,
2)", i));
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d2(time, s1, s2) values (%s, 3,
4)", i));
+ }
+ // DO NOT FLUSH HERE
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Create topic
+ final String topicName = "topic7";
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ final Properties config = new Properties();
+ config.put(TopicConstant.PATTERN_KEY, "root.db.d1.s1");
+ session.createTopic(topicName, config);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Subscription
+ final AtomicInteger rowCount = new AtomicInteger();
+ try (final SubscriptionPushConsumer consumer =
+ new SubscriptionPushConsumer.Builder()
+ .host(host)
+ .port(port)
+ .ackStrategy(AckStrategy.AFTER_CONSUME)
+ .consumeListener(
+ message -> {
+ for (final SubscriptionSessionDataSet dataSet :
+ message.getSessionDataSetsHandler()) {
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ rowCount.addAndGet(1);
+ }
+ }
+ return ConsumeResult.SUCCESS;
+ })
+ .buildPushConsumer()) {
+
+ consumer.open();
+ consumer.subscribe(topicName);
+
+ AWAIT.untilAsserted(
+ () -> {
+ Assert.assertEquals(100, rowCount.get());
+ Assert.assertNotNull(consumer.getConsumerId());
+ Assert.assertNotNull(consumer.getConsumerGroupId());
+ });
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPullConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPullConsumerIT.java
index c1b853cec07..97ac8b9d071 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPullConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPullConsumerIT.java
@@ -280,7 +280,9 @@ public class IoTDBTestParamPullConsumerIT extends
AbstractSubscriptionRegression
new SubscriptionPullConsumer(null).open();
}
- @Test(expected = NullPointerException.class)
+ @Test(
+ expected =
+ SubscriptionConnectionException.class) // connect to
TEndPoint(ip:localhost, port:6667)
public void testCreateConsumer_empty() {
new SubscriptionPullConsumer(new Properties()).open();
}
@@ -290,7 +292,7 @@ public class IoTDBTestParamPullConsumerIT extends
AbstractSubscriptionRegression
new SubscriptionPullConsumer.Builder().buildPullConsumer().open();
}
- @Test(expected = NullPointerException.class)
+ @Test(expected = SubscriptionIdentifierSemanticException.class)
public void testSubscribe_null() {
consumer.subscribe((String) null);
}
@@ -320,7 +322,7 @@ public class IoTDBTestParamPullConsumerIT extends
AbstractSubscriptionRegression
consumer1.close();
}
- @Test(expected = NullPointerException.class)
+ @Test(expected = SubscriptionIdentifierSemanticException.class)
public void testUnSubscribe_null() {
consumer.unsubscribe((String) null);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPushConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPushConsumerIT.java
index e9279a22ca4..ebd76874e06 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPushConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPushConsumerIT.java
@@ -207,7 +207,9 @@ public class IoTDBTestParamPushConsumerIT extends
AbstractSubscriptionRegression
new SubscriptionPushConsumer(null).open();
}
- @Test(expected = NullPointerException.class)
+ @Test(
+ expected =
+ SubscriptionConnectionException.class) // connect to
TEndPoint(ip:localhost, port:6667)
public void testCreateConsumer_empty() {
new SubscriptionPushConsumer(new Properties()).open();
}
@@ -217,7 +219,7 @@ public class IoTDBTestParamPushConsumerIT extends
AbstractSubscriptionRegression
new SubscriptionPushConsumer.Builder().buildPushConsumer().open();
}
- @Test(expected = NullPointerException.class)
+ @Test(expected = SubscriptionIdentifierSemanticException.class)
public void testSubscribe_null() {
consumer.subscribe((String) null);
}
@@ -247,7 +249,7 @@ public class IoTDBTestParamPushConsumerIT extends
AbstractSubscriptionRegression
consumer1.close();
}
- @Test(expected = NullPointerException.class)
+ @Test(expected = SubscriptionIdentifierSemanticException.class)
public void testUnSubscribe_null() {
consumer.unsubscribe((String) null);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamTopicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamTopicIT.java
index c2636752296..89ca62bfb45 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamTopicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamTopicIT.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionMis
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
+import
org.apache.iotdb.rpc.subscription.exception.SubscriptionIdentifierSemanticException;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
@@ -78,13 +79,13 @@ public class IoTDBTestParamTopicIT extends
AbstractSubscriptionRegressionIT {
subs.getTopics().forEach(System.out::println);
}
- @Test // Will create a topic named null
+ @Test(expected = SubscriptionIdentifierSemanticException.class)
public void testCreateTopic_null() throws IoTDBConnectionException,
StatementExecutionException {
subs.createTopic(null);
printTopics("testCreateTopic_null");
}
- @Test(expected = StatementExecutionException.class)
+ @Test(expected = SubscriptionIdentifierSemanticException.class)
public void testCreateTopic_emptyString()
throws IoTDBConnectionException, StatementExecutionException {
subs.createTopic("");
@@ -98,7 +99,7 @@ public class IoTDBTestParamTopicIT extends
AbstractSubscriptionRegressionIT {
printTopics("testCreateTopic_dup");
}
- @Test(expected = StatementExecutionException.class)
+ @Test(expected = SubscriptionIdentifierSemanticException.class)
public void testCreateTopic_invalid()
throws IoTDBConnectionException, StatementExecutionException {
subs.createTopic("Topic-1");
@@ -209,12 +210,12 @@ public class IoTDBTestParamTopicIT extends
AbstractSubscriptionRegressionIT {
dropDB(database);
}
- @Test(expected = StatementExecutionException.class) // drop non-existent
topic
+ @Test(expected = SubscriptionIdentifierSemanticException.class)
public void testDropTopic_null() throws IoTDBConnectionException,
StatementExecutionException {
subs.dropTopic(null);
}
- @Test(expected = StatementExecutionException.class)
+ @Test(expected = SubscriptionIdentifierSemanticException.class)
public void testDropTopic_empty() throws IoTDBConnectionException,
StatementExecutionException {
subs.dropTopic("");
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
index 870a91bb297..40c6647ae0a 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionConnection;
import org.apache.iotdb.session.subscription.model.Subscription;
import org.apache.iotdb.session.subscription.model.Topic;
+import org.apache.iotdb.session.subscription.util.IdentifierUtils;
import org.apache.tsfile.read.common.Field;
import org.apache.tsfile.read.common.RowRecord;
@@ -130,6 +131,7 @@ public class SubscriptionSession extends Session {
*/
public void createTopic(final String topicName)
throws IoTDBConnectionException, StatementExecutionException {
+ IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse
result
final String sql = String.format("CREATE TOPIC %s", topicName);
executeNonQueryStatement(sql);
}
@@ -147,6 +149,7 @@ public class SubscriptionSession extends Session {
*/
public void createTopicIfNotExists(final String topicName)
throws IoTDBConnectionException, StatementExecutionException {
+ IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse
result
final String sql = String.format("CREATE TOPIC IF NOT EXISTS %s",
topicName);
executeNonQueryStatement(sql);
}
@@ -165,6 +168,7 @@ public class SubscriptionSession extends Session {
*/
public void createTopic(final String topicName, final Properties properties)
throws IoTDBConnectionException, StatementExecutionException {
+ IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse
result
createTopic(topicName, properties, false);
}
@@ -180,6 +184,7 @@ public class SubscriptionSession extends Session {
*/
public void createTopicIfNotExists(final String topicName, final Properties
properties)
throws IoTDBConnectionException, StatementExecutionException {
+ IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse
result
createTopic(topicName, properties, true);
}
@@ -224,6 +229,7 @@ public class SubscriptionSession extends Session {
*/
public void dropTopic(final String topicName)
throws IoTDBConnectionException, StatementExecutionException {
+ IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse
result
final String sql = String.format("DROP TOPIC %s", topicName);
executeNonQueryStatement(sql);
}
@@ -241,6 +247,7 @@ public class SubscriptionSession extends Session {
*/
public void dropTopicIfExists(final String topicName)
throws IoTDBConnectionException, StatementExecutionException {
+ IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse
result
final String sql = String.format("DROP TOPIC IF EXISTS %s", topicName);
executeNonQueryStatement(sql);
}
@@ -254,6 +261,7 @@ public class SubscriptionSession extends Session {
public Optional<Topic> getTopic(final String topicName)
throws IoTDBConnectionException, StatementExecutionException {
+ IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse
result
final String sql = String.format("SHOW TOPIC %s", topicName);
try (final SessionDataSet dataSet = executeQueryStatement(sql)) {
final Set<Topic> topics = convertDataSetToTopics(dataSet);
@@ -276,6 +284,7 @@ public class SubscriptionSession extends Session {
public Set<Subscription> getSubscriptions(final String topicName)
throws IoTDBConnectionException, StatementExecutionException {
+ IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse
result
final String sql = String.format("SHOW SUBSCRIPTIONS ON %s", topicName);
try (final SessionDataSet dataSet = executeQueryStatement(sql)) {
return convertDataSetToSubscriptions(dataSet);
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index c85bdc4b9a6..58dc7c7462d 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.session.subscription.util.PollTimer;
import org.apache.iotdb.session.subscription.util.RandomStringGenerator;
import org.apache.iotdb.session.util.SessionUtils;
+import org.apache.thrift.annotation.Nullable;
import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -316,7 +317,9 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
if (needParse) {
topicNames =
-
topicNames.stream().map(IdentifierUtils::parseIdentifier).collect(Collectors.toSet());
+ topicNames.stream()
+ .map(IdentifierUtils::checkAndParseIdentifier)
+ .collect(Collectors.toSet());
}
providers.acquireReadLock();
@@ -346,7 +349,9 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
if (needParse) {
topicNames =
-
topicNames.stream().map(IdentifierUtils::parseIdentifier).collect(Collectors.toSet());
+ topicNames.stream()
+ .map(IdentifierUtils::checkAndParseIdentifier)
+ .collect(Collectors.toSet());
}
providers.acquireReadLock();
@@ -377,7 +382,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
} catch (final Exception ignored) {
}
throw new SubscriptionConnectionException(
- String.format("Failed to handshake with subscription provider %s",
provider));
+ String.format("Failed to handshake with subscription provider %s",
provider), e);
}
// update consumer id and consumer group id if not exist
@@ -1407,13 +1412,19 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
return this;
}
- public Builder consumerId(final String consumerId) {
- this.consumerId = IdentifierUtils.parseIdentifier(consumerId);
+ public Builder consumerId(@Nullable final String consumerId) {
+ if (Objects.isNull(consumerId)) {
+ return this;
+ }
+ this.consumerId = IdentifierUtils.checkAndParseIdentifier(consumerId);
return this;
}
- public Builder consumerGroupId(final String consumerGroupId) {
- this.consumerGroupId = IdentifierUtils.parseIdentifier(consumerGroupId);
+ public Builder consumerGroupId(@Nullable final String consumerGroupId) {
+ if (Objects.isNull(consumerGroupId)) {
+ return this;
+ }
+ this.consumerGroupId =
IdentifierUtils.checkAndParseIdentifier(consumerGroupId);
return this;
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
index 874659c4339..75d8119ea04 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
@@ -156,7 +156,9 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
throws SubscriptionException {
// parse topic names from external source
Set<String> parsedTopicNames =
-
topicNames.stream().map(IdentifierUtils::parseIdentifier).collect(Collectors.toSet());
+ topicNames.stream()
+ .map(IdentifierUtils::checkAndParseIdentifier)
+ .collect(Collectors.toSet());
if (!parsedTopicNames.isEmpty()) {
// filter unsubscribed topics
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/IdentifierUtils.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/IdentifierUtils.java
index 9f6d09ef44e..6947ac9ef7b 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/IdentifierUtils.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/IdentifierUtils.java
@@ -24,12 +24,20 @@ import
org.apache.iotdb.rpc.subscription.exception.SubscriptionIdentifierSemanti
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.read.common.parser.PathVisitor;
+import java.util.Objects;
+
public class IdentifierUtils {
/**
* refer
org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor#parseIdentifier(java.lang.String)
*/
- public static String parseIdentifier(final String src) {
+ public static String checkAndParseIdentifier(final String src) {
+ if (Objects.isNull(src)) {
+ throw new SubscriptionIdentifierSemanticException("null identifier is
not supported");
+ }
+ if (src.isEmpty()) {
+ throw new SubscriptionIdentifierSemanticException("empty identifier is
not supported");
+ }
if (src.startsWith(TsFileConstant.BACK_QUOTE_STRING)
&& src.endsWith(TsFileConstant.BACK_QUOTE_STRING)) {
return src.substring(1, src.length() - 1)