This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d28417bac6d Subscription: fix parsing topic name with back quote when
using JAVA SDK client (#12322)
d28417bac6d is described below
commit d28417bac6d1192dd4d4f07744349d8df7d4d000
Author: V_Galaxy <[email protected]>
AuthorDate: Thu Apr 11 20:05:30 2024 +0800
Subscription: fix parsing topic name with back quote when using JAVA SDK
client (#12322)
---
.../apache/iotdb/SubscriptionSessionExample.java | 18 ++-
.../it/dual/IoTDBSubscriptionTopicIT.java | 153 +++++++++++++++++++--
.../it/local/IoTDBSubscriptionBasicIT.java | 7 +-
.../it/local/IoTDBSubscriptionIdempotentIT.java | 17 ++-
.../it/local/IoTDBSubscriptionRestartIT.java | 21 +--
.../receiver/SubscriptionReceiverV1.java | 5 +
6 files changed, 186 insertions(+), 35 deletions(-)
diff --git
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
index f31eac1df6c..0c674f41b79 100644
---
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
+++
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.session.subscription.SubscriptionSessionDataSet;
import org.apache.iotdb.session.subscription.SubscriptionSessionDataSets;
import java.time.Duration;
+import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -63,10 +64,12 @@ public class SubscriptionSessionExample {
session.executeNonQueryStatement("flush");
// Create topic
+ final String topic1 = "topic1";
+ final String topic2 = "`topic2`";
try (SubscriptionSession subscriptionSession = new
SubscriptionSession(LOCAL_HOST, 6667)) {
subscriptionSession.open();
- subscriptionSession.createTopic("topic1");
- subscriptionSession.createTopic("topic2");
+ subscriptionSession.createTopic(topic1);
+ subscriptionSession.createTopic(topic2);
}
// Subscription: property-style ctor
@@ -75,7 +78,7 @@ public class SubscriptionSessionExample {
config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config);
consumer1.open();
- consumer1.subscribe("topic1");
+ consumer1.subscribe(topic1);
while (true) {
Thread.sleep(1000); // Wait for some time
List<SubscriptionMessage> messages =
consumer1.poll(Duration.ofMillis(10000));
@@ -102,7 +105,7 @@ public class SubscriptionSessionExample {
subscriptionSession.getSubscriptions().forEach((System.out::println));
}
- consumer1.unsubscribe("topic1");
+ consumer1.unsubscribe(topic1);
consumer1.close();
// Subscription: builder-style ctor
@@ -113,10 +116,11 @@ public class SubscriptionSessionExample {
.autoCommit(false)
.buildPullConsumer()) {
consumer2.open();
- consumer2.subscribe("topic2");
+ consumer2.subscribe(topic2);
while (true) {
Thread.sleep(1000); // wait some time
- List<SubscriptionMessage> messages =
consumer2.poll(Duration.ofMillis(10000));
+ List<SubscriptionMessage> messages =
+ consumer2.poll(Collections.singleton(topic2),
Duration.ofMillis(10000));
if (messages.isEmpty()) {
break;
}
@@ -132,7 +136,7 @@ public class SubscriptionSessionExample {
}
consumer2.commitSync(messages);
}
- consumer2.unsubscribe("topic2");
+ consumer2.unsubscribe(topic2);
}
// Query
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index e8cc96c3f12..42fb88adaf4 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -78,13 +78,14 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
// Create topic on sender
+ final String topicName = "topic1";
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
session.open();
final Properties config = new Properties();
config.put(TopicConstant.PATH_KEY, "root.db.*.s");
- session.createTopic("topic1", config);
+ session.createTopic(topicName, config);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -105,7 +106,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
.buildPullConsumer();
final ISession session = receiverEnv.getSessionConnection())
{
consumer.open();
- consumer.subscribe("topic1");
+ consumer.subscribe(topicName);
while (!isClosed.get()) {
try {
Thread.sleep(1000); // wait some time
@@ -127,7 +128,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
consumer.commitSync(messages);
}
- consumer.unsubscribe("topic1");
+ consumer.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
// Avoid fail
@@ -184,13 +185,14 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
// Create topic on sender
+ final String topicName = "topic2";
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
session.open();
final Properties config = new Properties();
config.put(TopicConstant.START_TIME_KEY, currentTime);
- session.createTopic("topic1", config);
+ session.createTopic(topicName, config);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -211,7 +213,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
.buildPullConsumer();
final ISession session = receiverEnv.getSessionConnection())
{
consumer.open();
- consumer.subscribe("topic1");
+ consumer.subscribe(topicName);
while (!isClosed.get()) {
try {
Thread.sleep(1000); // wait some time
@@ -233,7 +235,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
consumer.commitSync(messages);
}
- consumer.unsubscribe("topic1");
+ consumer.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
// Avoid failure
@@ -284,6 +286,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
// Create topic
+ final String topicName = "topic3";
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
@@ -292,7 +295,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
config.put("processor", "tumbling-time-sampling-processor");
config.put("processor.tumbling-time.interval-seconds", "1");
config.put("processor.down-sampling.split-file", "true");
- session.createTopic("topic1", config);
+ session.createTopic(topicName, config);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -313,7 +316,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
.buildPullConsumer();
final ISession session = receiverEnv.getSessionConnection())
{
consumer.open();
- consumer.subscribe("topic1");
+ consumer.subscribe(topicName);
while (!isClosed.get()) {
try {
Thread.sleep(1000); // wait some time
@@ -335,7 +338,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
consumer.commitSync(messages);
}
- consumer.unsubscribe("topic1");
+ consumer.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
// Avoid failure
@@ -373,4 +376,136 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
thread.join();
}
}
+
+ @Test
+ public void testTopicNameWithBackQuote() throws Exception {
+ // Insert some historical data on sender
+ try (final ISession session = senderEnv.getSessionConnection()) {
+ for (int i = 0; i < 100; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s) values (%s, 1)",
i));
+ }
+ for (int i = 100; i < 200; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s) values (%s, 1)",
i));
+ }
+ for (int i = 200; i < 300; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s) values (%s, 1)",
i));
+ }
+ session.executeNonQueryStatement("flush");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Create topic on sender
+ final String topic1 = "`topic1`";
+ final String topic2 = "`'topic2'`";
+ final String topic3 = "`\"topic3\"`";
+ final String host = senderEnv.getIP();
+ final int port = Integer.parseInt(senderEnv.getPort());
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ {
+ final Properties config = new Properties();
+ config.put(TopicConstant.START_TIME_KEY, 0);
+ config.put(TopicConstant.END_TIME_KEY, 99);
+ session.createTopic(topic1, config);
+ }
+ {
+ final Properties config = new Properties();
+ config.put(TopicConstant.START_TIME_KEY, 100);
+ config.put(TopicConstant.END_TIME_KEY, 199);
+ session.createTopic(topic2, config);
+ }
+ {
+ final Properties config = new Properties();
+ config.put(TopicConstant.START_TIME_KEY, 200);
+ config.put(TopicConstant.END_TIME_KEY, 299);
+ session.createTopic(topic3, config);
+ }
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Subscribe on sender and insert on receiver
+ final Set<String> topics = new HashSet<>();
+ topics.add(topic1);
+ topics.add(topic2);
+ topics.add(topic3);
+ final AtomicBoolean isClosed = new AtomicBoolean(false);
+ final Thread thread =
+ new Thread(
+ () -> {
+ try (final SubscriptionPullConsumer consumer =
+ new SubscriptionPullConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .autoCommit(false)
+ .buildPullConsumer();
+ final ISession session = receiverEnv.getSessionConnection())
{
+ consumer.open();
+ consumer.subscribe(topics);
+ while (!isClosed.get()) {
+ try {
+ Thread.sleep(1000); // wait some time
+ } catch (final InterruptedException e) {
+ break;
+ }
+ final List<SubscriptionMessage> messages =
+ consumer.poll(topics, Duration.ofMillis(10000));
+ if (messages.isEmpty()) {
+ continue;
+ }
+ for (final SubscriptionMessage message : messages) {
+ final SubscriptionSessionDataSets payload =
+ (SubscriptionSessionDataSets) message.getPayload();
+ for (final Iterator<Tablet> it = payload.tabletIterator();
it.hasNext(); ) {
+ final Tablet tablet = it.next();
+ session.insertTablet(tablet);
+ }
+ }
+ consumer.commitSync(messages);
+ }
+ consumer.unsubscribe(topics);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ // Avoid failure
+ } finally {
+ LOGGER.info("consumer exiting...");
+ }
+ });
+ thread.start();
+
+ // Check data on receiver
+ try {
+ try (final Connection connection = receiverEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ // Keep retrying if there are execution failures
+ Awaitility.await()
+ .pollDelay(1, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .atMost(120, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ TestUtils.assertSingleResultSetEqual(
+ TestUtils.executeQueryWithRetry(statement, "select
count(*) from root.**"),
+ new HashMap<String, String>() {
+ {
+ put("count(root.db.d1.s)", "300");
+ }
+ }));
+ }
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ isClosed.set(true);
+ thread.join();
+ }
+ }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index 93a02c8b836..6938763c8f8 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -78,11 +78,12 @@ public class IoTDBSubscriptionBasicIT {
}
// Create topic
+ final String topicName = "topic1";
final String host = EnvFactory.getEnv().getIP();
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
session.open();
- session.createTopic("topic1");
+ session.createTopic(topicName);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -103,7 +104,7 @@ public class IoTDBSubscriptionBasicIT {
.autoCommit(false)
.buildPullConsumer()) {
consumer.open();
- consumer.subscribe("topic1");
+ consumer.subscribe(topicName);
while (!isClosed.get()) {
try {
Thread.sleep(1000); // wait some time
@@ -127,7 +128,7 @@ public class IoTDBSubscriptionBasicIT {
}
consumer.commitSync(messages);
}
- consumer.unsubscribe("topic1");
+ consumer.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
// Avoid failure
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
index ff5a86bc220..f97d642b384 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
@@ -58,6 +58,7 @@ public class IoTDBSubscriptionIdempotentIT {
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
// Subscribe non-existed topic
+ final String topicName = "topic1";
try (final SubscriptionPullConsumer consumer =
new SubscriptionPullConsumer.Builder()
.host(host)
@@ -67,7 +68,7 @@ public class IoTDBSubscriptionIdempotentIT {
.autoCommit(false)
.buildPullConsumer()) {
consumer.open();
- consumer.subscribe("topic1");
+ consumer.subscribe(topicName);
fail();
} catch (final Exception ignored) {
} finally {
@@ -84,7 +85,7 @@ public class IoTDBSubscriptionIdempotentIT {
.autoCommit(false)
.buildPullConsumer()) {
consumer.open();
- consumer.unsubscribe("topic1");
+ consumer.unsubscribe(topicName);
fail();
} catch (final Exception ignored) {
} finally {
@@ -98,9 +99,10 @@ public class IoTDBSubscriptionIdempotentIT {
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
// Create topic
+ final String topicName = "topic2";
try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
session.open();
- session.createTopic("topic1");
+ session.createTopic(topicName);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
@@ -115,9 +117,9 @@ public class IoTDBSubscriptionIdempotentIT {
.autoCommit(false)
.buildPullConsumer()) {
consumer.open();
- consumer.subscribe("topic1");
+ consumer.subscribe(topicName);
// Subscribe existed subscribed topic
- consumer.subscribe("topic1");
+ consumer.subscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
@@ -132,9 +134,10 @@ public class IoTDBSubscriptionIdempotentIT {
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
// Create topic
+ final String topicName = "topic3";
try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
session.open();
- session.createTopic("topic1");
+ session.createTopic(topicName);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
@@ -150,7 +153,7 @@ public class IoTDBSubscriptionIdempotentIT {
.buildPullConsumer()) {
consumer.open();
// Unsubscribe existed non-subscribed topic
- consumer.unsubscribe("topic1");
+ consumer.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
index 9e33a349e27..c0ff9d67466 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
@@ -88,9 +88,10 @@ public class IoTDBSubscriptionRestartIT {
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
// Create topic
+ final String topicName = "topic1";
try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
session.open();
- session.createTopic("topic1");
+ session.createTopic(topicName);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -107,7 +108,7 @@ public class IoTDBSubscriptionRestartIT {
.autoCommit(false)
.buildPullConsumer();
consumer.open();
- consumer.subscribe("topic1");
+ consumer.subscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -181,7 +182,7 @@ public class IoTDBSubscriptionRestartIT {
}
consumer.commitSync(messages);
}
- consumer.unsubscribe("topic1");
+ consumer.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
// Avoid failure
@@ -215,9 +216,10 @@ public class IoTDBSubscriptionRestartIT {
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
// Create topic
+ final String topicName = "topic2";
try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
session.open();
- session.createTopic("topic1");
+ session.createTopic(topicName);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -237,7 +239,7 @@ public class IoTDBSubscriptionRestartIT {
.endpointsSyncIntervalMs(5000) // narrow endpoints sync interval
.buildPullConsumer();
consumer.open();
- consumer.subscribe("topic1");
+ consumer.subscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -294,7 +296,7 @@ public class IoTDBSubscriptionRestartIT {
// Auto commit
}
}
- consumerRef.unsubscribe("topic1");
+ consumerRef.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
// Avoid failure
@@ -346,9 +348,10 @@ public class IoTDBSubscriptionRestartIT {
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
// Create topic
+ final String topicName = "topic3";
try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
session.open();
- session.createTopic("topic1");
+ session.createTopic(topicName);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -368,7 +371,7 @@ public class IoTDBSubscriptionRestartIT {
.endpointsSyncIntervalMs(5000) // narrow endpoints sync interval
.buildPullConsumer();
consumer.open();
- consumer.subscribe("topic1");
+ consumer.subscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -420,7 +423,7 @@ public class IoTDBSubscriptionRestartIT {
// Auto commit
}
}
- consumerRef.unsubscribe("topic1");
+ consumerRef.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
// Avoid failure
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 8dfaf86d812..01640135b7b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.broker.SerializedEnrichedEvent;
import org.apache.iotdb.db.subscription.timer.SubscriptionPollTimer;
@@ -245,6 +246,7 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
// subscribe topics
Set<String> topicNames = req.getTopicNames();
+ topicNames =
topicNames.stream().map(ASTVisitor::parseIdentifier).collect(Collectors.toSet());
subscribe(consumerConfig, topicNames);
LOGGER.info("Subscription: consumer {} subscribe {} successfully",
consumerConfig, topicNames);
@@ -278,6 +280,7 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
// unsubscribe topics
Set<String> topicNames = req.getTopicNames();
+ topicNames =
topicNames.stream().map(ASTVisitor::parseIdentifier).collect(Collectors.toSet());
unsubscribe(consumerConfig, topicNames);
LOGGER.info(
@@ -316,6 +319,8 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
SubscriptionAgent.consumer()
.getTopicsSubscribedByConsumer(
consumerConfig.getConsumerGroupId(),
consumerConfig.getConsumerId());
+ } else {
+ topicNames =
topicNames.stream().map(ASTVisitor::parseIdentifier).collect(Collectors.toSet());
}
SubscriptionPollTimer timer =
new SubscriptionPollTimer(