This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d28417bac6d Subscription: fix parsing topic name with back quote when 
using JAVA SDK client (#12322)
d28417bac6d is described below

commit d28417bac6d1192dd4d4f07744349d8df7d4d000
Author: V_Galaxy <[email protected]>
AuthorDate: Thu Apr 11 20:05:30 2024 +0800

    Subscription: fix parsing topic name with back quote when using JAVA SDK 
client (#12322)
---
 .../apache/iotdb/SubscriptionSessionExample.java   |  18 ++-
 .../it/dual/IoTDBSubscriptionTopicIT.java          | 153 +++++++++++++++++++--
 .../it/local/IoTDBSubscriptionBasicIT.java         |   7 +-
 .../it/local/IoTDBSubscriptionIdempotentIT.java    |  17 ++-
 .../it/local/IoTDBSubscriptionRestartIT.java       |  21 +--
 .../receiver/SubscriptionReceiverV1.java           |   5 +
 6 files changed, 186 insertions(+), 35 deletions(-)

diff --git 
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
 
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
index f31eac1df6c..0c674f41b79 100644
--- 
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
+++ 
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.session.subscription.SubscriptionSessionDataSet;
 import org.apache.iotdb.session.subscription.SubscriptionSessionDataSets;
 
 import java.time.Duration;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
@@ -63,10 +64,12 @@ public class SubscriptionSessionExample {
     session.executeNonQueryStatement("flush");
 
     // Create topic
+    final String topic1 = "topic1";
+    final String topic2 = "`topic2`";
     try (SubscriptionSession subscriptionSession = new 
SubscriptionSession(LOCAL_HOST, 6667)) {
       subscriptionSession.open();
-      subscriptionSession.createTopic("topic1");
-      subscriptionSession.createTopic("topic2");
+      subscriptionSession.createTopic(topic1);
+      subscriptionSession.createTopic(topic2);
     }
 
     // Subscription: property-style ctor
@@ -75,7 +78,7 @@ public class SubscriptionSessionExample {
     config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
     SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config);
     consumer1.open();
-    consumer1.subscribe("topic1");
+    consumer1.subscribe(topic1);
     while (true) {
       Thread.sleep(1000); // Wait for some time
       List<SubscriptionMessage> messages = 
consumer1.poll(Duration.ofMillis(10000));
@@ -102,7 +105,7 @@ public class SubscriptionSessionExample {
       subscriptionSession.getSubscriptions().forEach((System.out::println));
     }
 
-    consumer1.unsubscribe("topic1");
+    consumer1.unsubscribe(topic1);
     consumer1.close();
 
     // Subscription: builder-style ctor
@@ -113,10 +116,11 @@ public class SubscriptionSessionExample {
             .autoCommit(false)
             .buildPullConsumer()) {
       consumer2.open();
-      consumer2.subscribe("topic2");
+      consumer2.subscribe(topic2);
       while (true) {
         Thread.sleep(1000); // wait some time
-        List<SubscriptionMessage> messages = 
consumer2.poll(Duration.ofMillis(10000));
+        List<SubscriptionMessage> messages =
+            consumer2.poll(Collections.singleton(topic2), 
Duration.ofMillis(10000));
         if (messages.isEmpty()) {
           break;
         }
@@ -132,7 +136,7 @@ public class SubscriptionSessionExample {
         }
         consumer2.commitSync(messages);
       }
-      consumer2.unsubscribe("topic2");
+      consumer2.unsubscribe(topic2);
     }
 
     // Query
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index e8cc96c3f12..42fb88adaf4 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -78,13 +78,14 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
     }
 
     // Create topic on sender
+    final String topicName = "topic1";
     final String host = senderEnv.getIP();
     final int port = Integer.parseInt(senderEnv.getPort());
     try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
       session.open();
       final Properties config = new Properties();
       config.put(TopicConstant.PATH_KEY, "root.db.*.s");
-      session.createTopic("topic1", config);
+      session.createTopic(topicName, config);
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -105,7 +106,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                           .buildPullConsumer();
                   final ISession session = receiverEnv.getSessionConnection()) 
{
                 consumer.open();
-                consumer.subscribe("topic1");
+                consumer.subscribe(topicName);
                 while (!isClosed.get()) {
                   try {
                     Thread.sleep(1000); // wait some time
@@ -127,7 +128,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                   }
                   consumer.commitSync(messages);
                 }
-                consumer.unsubscribe("topic1");
+                consumer.unsubscribe(topicName);
               } catch (final Exception e) {
                 e.printStackTrace();
                 // Avoid fail
@@ -184,13 +185,14 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
     }
 
     // Create topic on sender
+    final String topicName = "topic2";
     final String host = senderEnv.getIP();
     final int port = Integer.parseInt(senderEnv.getPort());
     try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
       session.open();
       final Properties config = new Properties();
       config.put(TopicConstant.START_TIME_KEY, currentTime);
-      session.createTopic("topic1", config);
+      session.createTopic(topicName, config);
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -211,7 +213,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                           .buildPullConsumer();
                   final ISession session = receiverEnv.getSessionConnection()) 
{
                 consumer.open();
-                consumer.subscribe("topic1");
+                consumer.subscribe(topicName);
                 while (!isClosed.get()) {
                   try {
                     Thread.sleep(1000); // wait some time
@@ -233,7 +235,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                   }
                   consumer.commitSync(messages);
                 }
-                consumer.unsubscribe("topic1");
+                consumer.unsubscribe(topicName);
               } catch (final Exception e) {
                 e.printStackTrace();
                 // Avoid failure
@@ -284,6 +286,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
     }
 
     // Create topic
+    final String topicName = "topic3";
     final String host = senderEnv.getIP();
     final int port = Integer.parseInt(senderEnv.getPort());
     try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
@@ -292,7 +295,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
       config.put("processor", "tumbling-time-sampling-processor");
       config.put("processor.tumbling-time.interval-seconds", "1");
       config.put("processor.down-sampling.split-file", "true");
-      session.createTopic("topic1", config);
+      session.createTopic(topicName, config);
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -313,7 +316,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                           .buildPullConsumer();
                   final ISession session = receiverEnv.getSessionConnection()) 
{
                 consumer.open();
-                consumer.subscribe("topic1");
+                consumer.subscribe(topicName);
                 while (!isClosed.get()) {
                   try {
                     Thread.sleep(1000); // wait some time
@@ -335,7 +338,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                   }
                   consumer.commitSync(messages);
                 }
-                consumer.unsubscribe("topic1");
+                consumer.unsubscribe(topicName);
               } catch (final Exception e) {
                 e.printStackTrace();
                 // Avoid failure
@@ -373,4 +376,136 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
       thread.join();
     }
   }
+
+  @Test
+  public void testTopicNameWithBackQuote() throws Exception {
+    // Insert some historical data on sender
+    try (final ISession session = senderEnv.getSessionConnection()) {
+      for (int i = 0; i < 100; ++i) {
+        session.executeNonQueryStatement(
+            String.format("insert into root.db.d1(time, s) values (%s, 1)", 
i));
+      }
+      for (int i = 100; i < 200; ++i) {
+        session.executeNonQueryStatement(
+            String.format("insert into root.db.d1(time, s) values (%s, 1)", 
i));
+      }
+      for (int i = 200; i < 300; ++i) {
+        session.executeNonQueryStatement(
+            String.format("insert into root.db.d1(time, s) values (%s, 1)", 
i));
+      }
+      session.executeNonQueryStatement("flush");
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Create topic on sender
+    final String topic1 = "`topic1`";
+    final String topic2 = "`'topic2'`";
+    final String topic3 = "`\"topic3\"`";
+    final String host = senderEnv.getIP();
+    final int port = Integer.parseInt(senderEnv.getPort());
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      {
+        final Properties config = new Properties();
+        config.put(TopicConstant.START_TIME_KEY, 0);
+        config.put(TopicConstant.END_TIME_KEY, 99);
+        session.createTopic(topic1, config);
+      }
+      {
+        final Properties config = new Properties();
+        config.put(TopicConstant.START_TIME_KEY, 100);
+        config.put(TopicConstant.END_TIME_KEY, 199);
+        session.createTopic(topic2, config);
+      }
+      {
+        final Properties config = new Properties();
+        config.put(TopicConstant.START_TIME_KEY, 200);
+        config.put(TopicConstant.END_TIME_KEY, 299);
+        session.createTopic(topic3, config);
+      }
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Subscribe on sender and insert on receiver
+    final Set<String> topics = new HashSet<>();
+    topics.add(topic1);
+    topics.add(topic2);
+    topics.add(topic3);
+    final AtomicBoolean isClosed = new AtomicBoolean(false);
+    final Thread thread =
+        new Thread(
+            () -> {
+              try (final SubscriptionPullConsumer consumer =
+                      new SubscriptionPullConsumer.Builder()
+                          .host(host)
+                          .port(port)
+                          .consumerId("c1")
+                          .consumerGroupId("cg1")
+                          .autoCommit(false)
+                          .buildPullConsumer();
+                  final ISession session = receiverEnv.getSessionConnection()) 
{
+                consumer.open();
+                consumer.subscribe(topics);
+                while (!isClosed.get()) {
+                  try {
+                    Thread.sleep(1000); // wait some time
+                  } catch (final InterruptedException e) {
+                    break;
+                  }
+                  final List<SubscriptionMessage> messages =
+                      consumer.poll(topics, Duration.ofMillis(10000));
+                  if (messages.isEmpty()) {
+                    continue;
+                  }
+                  for (final SubscriptionMessage message : messages) {
+                    final SubscriptionSessionDataSets payload =
+                        (SubscriptionSessionDataSets) message.getPayload();
+                    for (final Iterator<Tablet> it = payload.tabletIterator(); 
it.hasNext(); ) {
+                      final Tablet tablet = it.next();
+                      session.insertTablet(tablet);
+                    }
+                  }
+                  consumer.commitSync(messages);
+                }
+                consumer.unsubscribe(topics);
+              } catch (final Exception e) {
+                e.printStackTrace();
+                // Avoid failure
+              } finally {
+                LOGGER.info("consumer exiting...");
+              }
+            });
+    thread.start();
+
+    // Check data on receiver
+    try {
+      try (final Connection connection = receiverEnv.getConnection();
+          final Statement statement = connection.createStatement()) {
+        // Keep retrying if there are execution failures
+        Awaitility.await()
+            .pollDelay(1, TimeUnit.SECONDS)
+            .pollInterval(1, TimeUnit.SECONDS)
+            .atMost(120, TimeUnit.SECONDS)
+            .untilAsserted(
+                () ->
+                    TestUtils.assertSingleResultSetEqual(
+                        TestUtils.executeQueryWithRetry(statement, "select 
count(*) from root.**"),
+                        new HashMap<String, String>() {
+                          {
+                            put("count(root.db.d1.s)", "300");
+                          }
+                        }));
+      }
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      isClosed.set(true);
+      thread.join();
+    }
+  }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index 93a02c8b836..6938763c8f8 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -78,11 +78,12 @@ public class IoTDBSubscriptionBasicIT {
     }
 
     // Create topic
+    final String topicName = "topic1";
     final String host = EnvFactory.getEnv().getIP();
     final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
     try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
       session.open();
-      session.createTopic("topic1");
+      session.createTopic(topicName);
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -103,7 +104,7 @@ public class IoTDBSubscriptionBasicIT {
                       .autoCommit(false)
                       .buildPullConsumer()) {
                 consumer.open();
-                consumer.subscribe("topic1");
+                consumer.subscribe(topicName);
                 while (!isClosed.get()) {
                   try {
                     Thread.sleep(1000); // wait some time
@@ -127,7 +128,7 @@ public class IoTDBSubscriptionBasicIT {
                   }
                   consumer.commitSync(messages);
                 }
-                consumer.unsubscribe("topic1");
+                consumer.unsubscribe(topicName);
               } catch (final Exception e) {
                 e.printStackTrace();
                 // Avoid failure
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
index ff5a86bc220..f97d642b384 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
@@ -58,6 +58,7 @@ public class IoTDBSubscriptionIdempotentIT {
     final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
 
     // Subscribe non-existed topic
+    final String topicName = "topic1";
     try (final SubscriptionPullConsumer consumer =
         new SubscriptionPullConsumer.Builder()
             .host(host)
@@ -67,7 +68,7 @@ public class IoTDBSubscriptionIdempotentIT {
             .autoCommit(false)
             .buildPullConsumer()) {
       consumer.open();
-      consumer.subscribe("topic1");
+      consumer.subscribe(topicName);
       fail();
     } catch (final Exception ignored) {
     } finally {
@@ -84,7 +85,7 @@ public class IoTDBSubscriptionIdempotentIT {
             .autoCommit(false)
             .buildPullConsumer()) {
       consumer.open();
-      consumer.unsubscribe("topic1");
+      consumer.unsubscribe(topicName);
       fail();
     } catch (final Exception ignored) {
     } finally {
@@ -98,9 +99,10 @@ public class IoTDBSubscriptionIdempotentIT {
     final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
 
     // Create topic
+    final String topicName = "topic2";
     try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
       session.open();
-      session.createTopic("topic1");
+      session.createTopic(topicName);
     } catch (final Exception e) {
       e.printStackTrace();
       Assert.fail(e.getMessage());
@@ -115,9 +117,9 @@ public class IoTDBSubscriptionIdempotentIT {
             .autoCommit(false)
             .buildPullConsumer()) {
       consumer.open();
-      consumer.subscribe("topic1");
+      consumer.subscribe(topicName);
       // Subscribe existed subscribed topic
-      consumer.subscribe("topic1");
+      consumer.subscribe(topicName);
     } catch (final Exception e) {
       e.printStackTrace();
       Assert.fail(e.getMessage());
@@ -132,9 +134,10 @@ public class IoTDBSubscriptionIdempotentIT {
     final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
 
     // Create topic
+    final String topicName = "topic3";
     try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
       session.open();
-      session.createTopic("topic1");
+      session.createTopic(topicName);
     } catch (final Exception e) {
       e.printStackTrace();
       Assert.fail(e.getMessage());
@@ -150,7 +153,7 @@ public class IoTDBSubscriptionIdempotentIT {
             .buildPullConsumer()) {
       consumer.open();
       // Unsubscribe existed non-subscribed topic
-      consumer.unsubscribe("topic1");
+      consumer.unsubscribe(topicName);
     } catch (final Exception e) {
       e.printStackTrace();
       Assert.fail(e.getMessage());
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
index 9e33a349e27..c0ff9d67466 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
@@ -88,9 +88,10 @@ public class IoTDBSubscriptionRestartIT {
     final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
 
     // Create topic
+    final String topicName = "topic1";
     try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
       session.open();
-      session.createTopic("topic1");
+      session.createTopic(topicName);
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -107,7 +108,7 @@ public class IoTDBSubscriptionRestartIT {
               .autoCommit(false)
               .buildPullConsumer();
       consumer.open();
-      consumer.subscribe("topic1");
+      consumer.subscribe(topicName);
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -181,7 +182,7 @@ public class IoTDBSubscriptionRestartIT {
                   }
                   consumer.commitSync(messages);
                 }
-                consumer.unsubscribe("topic1");
+                consumer.unsubscribe(topicName);
               } catch (final Exception e) {
                 e.printStackTrace();
                 // Avoid failure
@@ -215,9 +216,10 @@ public class IoTDBSubscriptionRestartIT {
     final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
 
     // Create topic
+    final String topicName = "topic2";
     try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
       session.open();
-      session.createTopic("topic1");
+      session.createTopic(topicName);
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -237,7 +239,7 @@ public class IoTDBSubscriptionRestartIT {
               .endpointsSyncIntervalMs(5000) // narrow endpoints sync interval
               .buildPullConsumer();
       consumer.open();
-      consumer.subscribe("topic1");
+      consumer.subscribe(topicName);
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -294,7 +296,7 @@ public class IoTDBSubscriptionRestartIT {
                     // Auto commit
                   }
                 }
-                consumerRef.unsubscribe("topic1");
+                consumerRef.unsubscribe(topicName);
               } catch (final Exception e) {
                 e.printStackTrace();
                 // Avoid failure
@@ -346,9 +348,10 @@ public class IoTDBSubscriptionRestartIT {
     final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
 
     // Create topic
+    final String topicName = "topic3";
     try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
       session.open();
-      session.createTopic("topic1");
+      session.createTopic(topicName);
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -368,7 +371,7 @@ public class IoTDBSubscriptionRestartIT {
               .endpointsSyncIntervalMs(5000) // narrow endpoints sync interval
               .buildPullConsumer();
       consumer.open();
-      consumer.subscribe("topic1");
+      consumer.subscribe(topicName);
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -420,7 +423,7 @@ public class IoTDBSubscriptionRestartIT {
                     // Auto commit
                   }
                 }
-                consumerRef.unsubscribe("topic1");
+                consumerRef.unsubscribe(topicName);
               } catch (final Exception e) {
                 e.printStackTrace();
                 // Avoid failure
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 8dfaf86d812..01640135b7b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.db.subscription.broker.SerializedEnrichedEvent;
 import org.apache.iotdb.db.subscription.timer.SubscriptionPollTimer;
@@ -245,6 +246,7 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
 
     // subscribe topics
     Set<String> topicNames = req.getTopicNames();
+    topicNames = 
topicNames.stream().map(ASTVisitor::parseIdentifier).collect(Collectors.toSet());
     subscribe(consumerConfig, topicNames);
 
     LOGGER.info("Subscription: consumer {} subscribe {} successfully", 
consumerConfig, topicNames);
@@ -278,6 +280,7 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
 
     // unsubscribe topics
     Set<String> topicNames = req.getTopicNames();
+    topicNames = 
topicNames.stream().map(ASTVisitor::parseIdentifier).collect(Collectors.toSet());
     unsubscribe(consumerConfig, topicNames);
 
     LOGGER.info(
@@ -316,6 +319,8 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
           SubscriptionAgent.consumer()
               .getTopicsSubscribedByConsumer(
                   consumerConfig.getConsumerGroupId(), 
consumerConfig.getConsumerId());
+    } else {
+      topicNames = 
topicNames.stream().map(ASTVisitor::parseIdentifier).collect(Collectors.toSet());
     }
     SubscriptionPollTimer timer =
         new SubscriptionPollTimer(

Reply via email to