This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 96f8c19a80b Subscription: fix NPE when building consumer with missing 
id & add more checker for topic operations using session (#14493)
96f8c19a80b is described below

commit 96f8c19a80b4567d8d78fe364bba4d0a486e995b
Author: VGalaxies <[email protected]>
AuthorDate: Thu Dec 19 17:39:19 2024 +0800

    Subscription: fix NPE when building consumer with missing id & add more 
checker for topic operations using session (#14493)
---
 .../it/local/IoTDBSubscriptionBasicIT.java         | 69 ++++++++++++++++++++++
 .../param/IoTDBTestParamPullConsumerIT.java        |  8 ++-
 .../param/IoTDBTestParamPushConsumerIT.java        |  8 ++-
 .../regression/param/IoTDBTestParamTopicIT.java    | 11 ++--
 .../session/subscription/SubscriptionSession.java  |  9 +++
 .../consumer/SubscriptionConsumer.java             | 25 +++++---
 .../consumer/SubscriptionPullConsumer.java         |  4 +-
 .../session/subscription/util/IdentifierUtils.java | 10 +++-
 8 files changed, 124 insertions(+), 20 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index 4fed09711c2..2c937de6aa0 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -552,4 +552,73 @@ public class IoTDBSubscriptionBasicIT extends 
AbstractSubscriptionLocalIT {
       fail(e.getMessage());
     }
   }
+
+  // same to
+  // 
org.apache.iotdb.subscription.it.local.IoTDBSubscriptionBasicIT.testDataSetDeduplication,
+  // but missing consumer id & consumer group id when building consumer
+  @Test
+  public void testMissingConsumerId() {
+    // Insert some historical data
+    try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      session.createDatabase("root.db");
+      for (int i = 0; i < 100; ++i) {
+        session.executeNonQueryStatement(
+            String.format("insert into root.db.d1(time, s1, s2) values (%s, 1, 
2)", i));
+        session.executeNonQueryStatement(
+            String.format("insert into root.db.d2(time, s1, s2) values (%s, 3, 
4)", i));
+      }
+      // DO NOT FLUSH HERE
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Create topic
+    final String topicName = "topic7";
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      final Properties config = new Properties();
+      config.put(TopicConstant.PATTERN_KEY, "root.db.d1.s1");
+      session.createTopic(topicName, config);
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Subscription
+    final AtomicInteger rowCount = new AtomicInteger();
+    try (final SubscriptionPushConsumer consumer =
+        new SubscriptionPushConsumer.Builder()
+            .host(host)
+            .port(port)
+            .ackStrategy(AckStrategy.AFTER_CONSUME)
+            .consumeListener(
+                message -> {
+                  for (final SubscriptionSessionDataSet dataSet :
+                      message.getSessionDataSetsHandler()) {
+                    while (dataSet.hasNext()) {
+                      dataSet.next();
+                      rowCount.addAndGet(1);
+                    }
+                  }
+                  return ConsumeResult.SUCCESS;
+                })
+            .buildPushConsumer()) {
+
+      consumer.open();
+      consumer.subscribe(topicName);
+
+      AWAIT.untilAsserted(
+          () -> {
+            Assert.assertEquals(100, rowCount.get());
+            Assert.assertNotNull(consumer.getConsumerId());
+            Assert.assertNotNull(consumer.getConsumerGroupId());
+          });
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPullConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPullConsumerIT.java
index c1b853cec07..97ac8b9d071 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPullConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPullConsumerIT.java
@@ -280,7 +280,9 @@ public class IoTDBTestParamPullConsumerIT extends 
AbstractSubscriptionRegression
     new SubscriptionPullConsumer(null).open();
   }
 
-  @Test(expected = NullPointerException.class)
+  @Test(
+      expected =
+          SubscriptionConnectionException.class) // connect to 
TEndPoint(ip:localhost, port:6667)
   public void testCreateConsumer_empty() {
     new SubscriptionPullConsumer(new Properties()).open();
   }
@@ -290,7 +292,7 @@ public class IoTDBTestParamPullConsumerIT extends 
AbstractSubscriptionRegression
     new SubscriptionPullConsumer.Builder().buildPullConsumer().open();
   }
 
-  @Test(expected = NullPointerException.class)
+  @Test(expected = SubscriptionIdentifierSemanticException.class)
   public void testSubscribe_null() {
     consumer.subscribe((String) null);
   }
@@ -320,7 +322,7 @@ public class IoTDBTestParamPullConsumerIT extends 
AbstractSubscriptionRegression
     consumer1.close();
   }
 
-  @Test(expected = NullPointerException.class)
+  @Test(expected = SubscriptionIdentifierSemanticException.class)
   public void testUnSubscribe_null() {
     consumer.unsubscribe((String) null);
   }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPushConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPushConsumerIT.java
index e9279a22ca4..ebd76874e06 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPushConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPushConsumerIT.java
@@ -207,7 +207,9 @@ public class IoTDBTestParamPushConsumerIT extends 
AbstractSubscriptionRegression
     new SubscriptionPushConsumer(null).open();
   }
 
-  @Test(expected = NullPointerException.class)
+  @Test(
+      expected =
+          SubscriptionConnectionException.class) // connect to 
TEndPoint(ip:localhost, port:6667)
   public void testCreateConsumer_empty() {
     new SubscriptionPushConsumer(new Properties()).open();
   }
@@ -217,7 +219,7 @@ public class IoTDBTestParamPushConsumerIT extends 
AbstractSubscriptionRegression
     new SubscriptionPushConsumer.Builder().buildPushConsumer().open();
   }
 
-  @Test(expected = NullPointerException.class)
+  @Test(expected = SubscriptionIdentifierSemanticException.class)
   public void testSubscribe_null() {
     consumer.subscribe((String) null);
   }
@@ -247,7 +249,7 @@ public class IoTDBTestParamPushConsumerIT extends 
AbstractSubscriptionRegression
     consumer1.close();
   }
 
-  @Test(expected = NullPointerException.class)
+  @Test(expected = SubscriptionIdentifierSemanticException.class)
   public void testUnSubscribe_null() {
     consumer.unsubscribe((String) null);
   }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamTopicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamTopicIT.java
index c2636752296..89ca62bfb45 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamTopicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamTopicIT.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionMis
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.rpc.subscription.config.TopicConstant;
+import 
org.apache.iotdb.rpc.subscription.exception.SubscriptionIdentifierSemanticException;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
 import 
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
 
@@ -78,13 +79,13 @@ public class IoTDBTestParamTopicIT extends 
AbstractSubscriptionRegressionIT {
     subs.getTopics().forEach(System.out::println);
   }
 
-  @Test // Will create a topic named null
+  @Test(expected = SubscriptionIdentifierSemanticException.class)
   public void testCreateTopic_null() throws IoTDBConnectionException, 
StatementExecutionException {
     subs.createTopic(null);
     printTopics("testCreateTopic_null");
   }
 
-  @Test(expected = StatementExecutionException.class)
+  @Test(expected = SubscriptionIdentifierSemanticException.class)
   public void testCreateTopic_emptyString()
       throws IoTDBConnectionException, StatementExecutionException {
     subs.createTopic("");
@@ -98,7 +99,7 @@ public class IoTDBTestParamTopicIT extends 
AbstractSubscriptionRegressionIT {
     printTopics("testCreateTopic_dup");
   }
 
-  @Test(expected = StatementExecutionException.class)
+  @Test(expected = SubscriptionIdentifierSemanticException.class)
   public void testCreateTopic_invalid()
       throws IoTDBConnectionException, StatementExecutionException {
     subs.createTopic("Topic-1");
@@ -209,12 +210,12 @@ public class IoTDBTestParamTopicIT extends 
AbstractSubscriptionRegressionIT {
     dropDB(database);
   }
 
-  @Test(expected = StatementExecutionException.class) // drop non-existent 
topic
+  @Test(expected = SubscriptionIdentifierSemanticException.class)
   public void testDropTopic_null() throws IoTDBConnectionException, 
StatementExecutionException {
     subs.dropTopic(null);
   }
 
-  @Test(expected = StatementExecutionException.class)
+  @Test(expected = SubscriptionIdentifierSemanticException.class)
   public void testDropTopic_empty() throws IoTDBConnectionException, 
StatementExecutionException {
     subs.dropTopic("");
   }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
index 870a91bb297..40c6647ae0a 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.SessionConnection;
 import org.apache.iotdb.session.subscription.model.Subscription;
 import org.apache.iotdb.session.subscription.model.Topic;
+import org.apache.iotdb.session.subscription.util.IdentifierUtils;
 
 import org.apache.tsfile.read.common.Field;
 import org.apache.tsfile.read.common.RowRecord;
@@ -130,6 +131,7 @@ public class SubscriptionSession extends Session {
    */
   public void createTopic(final String topicName)
       throws IoTDBConnectionException, StatementExecutionException {
+    IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse 
result
     final String sql = String.format("CREATE TOPIC %s", topicName);
     executeNonQueryStatement(sql);
   }
@@ -147,6 +149,7 @@ public class SubscriptionSession extends Session {
    */
   public void createTopicIfNotExists(final String topicName)
       throws IoTDBConnectionException, StatementExecutionException {
+    IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse 
result
     final String sql = String.format("CREATE TOPIC IF NOT EXISTS %s", 
topicName);
     executeNonQueryStatement(sql);
   }
@@ -165,6 +168,7 @@ public class SubscriptionSession extends Session {
    */
   public void createTopic(final String topicName, final Properties properties)
       throws IoTDBConnectionException, StatementExecutionException {
+    IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse 
result
     createTopic(topicName, properties, false);
   }
 
@@ -180,6 +184,7 @@ public class SubscriptionSession extends Session {
    */
   public void createTopicIfNotExists(final String topicName, final Properties 
properties)
       throws IoTDBConnectionException, StatementExecutionException {
+    IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse 
result
     createTopic(topicName, properties, true);
   }
 
@@ -224,6 +229,7 @@ public class SubscriptionSession extends Session {
    */
   public void dropTopic(final String topicName)
       throws IoTDBConnectionException, StatementExecutionException {
+    IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse 
result
     final String sql = String.format("DROP TOPIC %s", topicName);
     executeNonQueryStatement(sql);
   }
@@ -241,6 +247,7 @@ public class SubscriptionSession extends Session {
    */
   public void dropTopicIfExists(final String topicName)
       throws IoTDBConnectionException, StatementExecutionException {
+    IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse 
result
     final String sql = String.format("DROP TOPIC IF EXISTS %s", topicName);
     executeNonQueryStatement(sql);
   }
@@ -254,6 +261,7 @@ public class SubscriptionSession extends Session {
 
   public Optional<Topic> getTopic(final String topicName)
       throws IoTDBConnectionException, StatementExecutionException {
+    IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse 
result
     final String sql = String.format("SHOW TOPIC %s", topicName);
     try (final SessionDataSet dataSet = executeQueryStatement(sql)) {
       final Set<Topic> topics = convertDataSetToTopics(dataSet);
@@ -276,6 +284,7 @@ public class SubscriptionSession extends Session {
 
   public Set<Subscription> getSubscriptions(final String topicName)
       throws IoTDBConnectionException, StatementExecutionException {
+    IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse 
result
     final String sql = String.format("SHOW SUBSCRIPTIONS ON %s", topicName);
     try (final SessionDataSet dataSet = executeQueryStatement(sql)) {
       return convertDataSetToSubscriptions(dataSet);
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index c85bdc4b9a6..58dc7c7462d 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.session.subscription.util.PollTimer;
 import org.apache.iotdb.session.subscription.util.RandomStringGenerator;
 import org.apache.iotdb.session.util.SessionUtils;
 
+import org.apache.thrift.annotation.Nullable;
 import org.apache.tsfile.write.record.Tablet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -316,7 +317,9 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
 
     if (needParse) {
       topicNames =
-          
topicNames.stream().map(IdentifierUtils::parseIdentifier).collect(Collectors.toSet());
+          topicNames.stream()
+              .map(IdentifierUtils::checkAndParseIdentifier)
+              .collect(Collectors.toSet());
     }
 
     providers.acquireReadLock();
@@ -346,7 +349,9 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
 
     if (needParse) {
       topicNames =
-          
topicNames.stream().map(IdentifierUtils::parseIdentifier).collect(Collectors.toSet());
+          topicNames.stream()
+              .map(IdentifierUtils::checkAndParseIdentifier)
+              .collect(Collectors.toSet());
     }
 
     providers.acquireReadLock();
@@ -377,7 +382,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
       } catch (final Exception ignored) {
       }
       throw new SubscriptionConnectionException(
-          String.format("Failed to handshake with subscription provider %s", 
provider));
+          String.format("Failed to handshake with subscription provider %s", 
provider), e);
     }
 
     // update consumer id and consumer group id if not exist
@@ -1407,13 +1412,19 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
       return this;
     }
 
-    public Builder consumerId(final String consumerId) {
-      this.consumerId = IdentifierUtils.parseIdentifier(consumerId);
+    public Builder consumerId(@Nullable final String consumerId) {
+      if (Objects.isNull(consumerId)) {
+        return this;
+      }
+      this.consumerId = IdentifierUtils.checkAndParseIdentifier(consumerId);
       return this;
     }
 
-    public Builder consumerGroupId(final String consumerGroupId) {
-      this.consumerGroupId = IdentifierUtils.parseIdentifier(consumerGroupId);
+    public Builder consumerGroupId(@Nullable final String consumerGroupId) {
+      if (Objects.isNull(consumerGroupId)) {
+        return this;
+      }
+      this.consumerGroupId = 
IdentifierUtils.checkAndParseIdentifier(consumerGroupId);
       return this;
     }
 
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
index 874659c4339..75d8119ea04 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
@@ -156,7 +156,9 @@ public class SubscriptionPullConsumer extends 
SubscriptionConsumer {
       throws SubscriptionException {
     // parse topic names from external source
     Set<String> parsedTopicNames =
-        
topicNames.stream().map(IdentifierUtils::parseIdentifier).collect(Collectors.toSet());
+        topicNames.stream()
+            .map(IdentifierUtils::checkAndParseIdentifier)
+            .collect(Collectors.toSet());
 
     if (!parsedTopicNames.isEmpty()) {
       // filter unsubscribed topics
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/IdentifierUtils.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/IdentifierUtils.java
index 9f6d09ef44e..6947ac9ef7b 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/IdentifierUtils.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/IdentifierUtils.java
@@ -24,12 +24,20 @@ import 
org.apache.iotdb.rpc.subscription.exception.SubscriptionIdentifierSemanti
 import org.apache.tsfile.common.constant.TsFileConstant;
 import org.apache.tsfile.read.common.parser.PathVisitor;
 
+import java.util.Objects;
+
 public class IdentifierUtils {
 
   /**
    * refer 
org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor#parseIdentifier(java.lang.String)
    */
-  public static String parseIdentifier(final String src) {
+  public static String checkAndParseIdentifier(final String src) {
+    if (Objects.isNull(src)) {
+      throw new SubscriptionIdentifierSemanticException("null identifier is 
not supported");
+    }
+    if (src.isEmpty()) {
+      throw new SubscriptionIdentifierSemanticException("empty identifier is 
not supported");
+    }
     if (src.startsWith(TsFileConstant.BACK_QUOTE_STRING)
         && src.endsWith(TsFileConstant.BACK_QUOTE_STRING)) {
       return src.substring(1, src.length() - 1)

Reply via email to