This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 86ddd749cc0 Subscription: java client supports Create Topic If Not
Exists and Drop Topic If Exists (#13081)
86ddd749cc0 is described below
commit 86ddd749cc00d6e80d7b4421e603c24945dcc18a
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Aug 5 12:38:16 2024 +0800
Subscription: java client supports Create Topic If Not Exists and Drop
Topic If Exists (#13081)
---
.../it/local/IoTDBSubscriptionTopicIT.java | 178 +++++++++++++++++++++
.../session/subscription/SubscriptionSession.java | 94 ++++++++++-
2 files changed, 271 insertions(+), 1 deletion(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionTopicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionTopicIT.java
new file mode 100644
index 00000000000..55f618b83f4
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionTopicIT.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.local;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.model.Topic;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class})
+public class IoTDBSubscriptionTopicIT extends AbstractSubscriptionLocalIT {
+
+ @Test
+ public void testBasicCreateTopic() throws Exception {
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ // create topic
+ String topicName = "topic1";
+ session.createTopic(topicName);
+ Assert.assertTrue(session.getTopic(topicName).isPresent());
+ Assert.assertEquals(topicName,
session.getTopic(topicName).get().getTopicName());
+
+ // create topic
+ topicName = "topic2";
+ Properties properties = new Properties();
+ properties.put("path", "root.**");
+ properties.put("start-time", "2023-01-01");
+ properties.put("end-time", "2023-12-31");
+ properties.put("format", "TsFileHandler");
+ session.createTopic(topicName, properties);
+ Optional<Topic> topic = session.getTopic(topicName);
+ Assert.assertTrue(topic.isPresent());
+ Assert.assertEquals(topicName, topic.get().getTopicName());
+ // verify topic parameters
+
Assert.assertTrue(topic.get().getTopicAttributes().contains("path=root.**"));
+
Assert.assertTrue(topic.get().getTopicAttributes().contains("start-time=2023-01-01"));
+
Assert.assertTrue(topic.get().getTopicAttributes().contains("end-time=2023-12-31"));
+
Assert.assertTrue(topic.get().getTopicAttributes().contains("format=TsFileHandler"));
+
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBasicCreateTopicIfNotExists() throws Exception {
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ // create topic if not exits
+ String topicName = "topic3";
+ session.createTopicIfNotExists(topicName);
+ Optional<Topic> topic = session.getTopic(topicName);
+ Assert.assertTrue(topic.isPresent());
+ Assert.assertEquals(topicName, topic.get().getTopicName());
+
+ // create topic if not exits
+ session.createTopicIfNotExists(topicName);
+ topic = session.getTopic(topicName);
+ Assert.assertTrue(topic.isPresent());
+ Assert.assertEquals(topicName, topic.get().getTopicName());
+
+ // create topic if not exits
+ topicName = "topic4";
+ Properties properties = new Properties();
+ properties.put("path", "root.**");
+ properties.put("start-time", "2023-01-01");
+ properties.put("end-time", "2023-12-31");
+ properties.put("format", "TsFileHandler");
+ session.createTopicIfNotExists(topicName, properties);
+ topic = session.getTopic(topicName);
+ Assert.assertTrue(topic.isPresent());
+ Assert.assertEquals(topicName, topic.get().getTopicName());
+ // verify topic parameters
+
Assert.assertTrue(topic.get().getTopicAttributes().contains("path=root.**"));
+
Assert.assertTrue(topic.get().getTopicAttributes().contains("start-time=2023-01-01"));
+
Assert.assertTrue(topic.get().getTopicAttributes().contains("end-time=2023-12-31"));
+
Assert.assertTrue(topic.get().getTopicAttributes().contains("format=TsFileHandler"));
+
+ // create topic if not exits
+ properties.put("start-time", "2023-01-02");
+ session.createTopicIfNotExists(topicName, properties);
+ topic = session.getTopic(topicName);
+ Assert.assertTrue(topic.isPresent());
+ Assert.assertEquals(topicName, topic.get().getTopicName());
+ // Verify Topic Parameters
+
Assert.assertTrue(topic.get().getTopicAttributes().contains("path=root.**"));
+
Assert.assertTrue(topic.get().getTopicAttributes().contains("start-time=2023-01-01"));
+
Assert.assertFalse(topic.get().getTopicAttributes().contains("start-time=2023-01-02"));
+
Assert.assertTrue(topic.get().getTopicAttributes().contains("end-time=2023-12-31"));
+
Assert.assertTrue(topic.get().getTopicAttributes().contains("format=TsFileHandler"));
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBasicDropTopic() throws Exception {
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ // create topic
+ String topicName = "topic5";
+ session.createTopic(topicName);
+
+ // drop topic
+ session.dropTopic(topicName);
+ Assert.assertFalse(session.getTopic(topicName).isPresent());
+
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBasicDropTopicIfExists() throws Exception {
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ // create topic
+ String topicName = "topic6";
+ session.createTopic(topicName);
+
+ // drop topic if exists
+ session.dropTopicIfExists(topicName);
+ Assert.assertFalse(session.getTopic(topicName).isPresent());
+
+ // drop topic if exists
+ session.dropTopicIfExists(topicName);
+ Assert.assertFalse(session.getTopic(topicName).isPresent());
+
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
index d70de467db0..e5fc35c100a 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
@@ -80,14 +80,75 @@ public class SubscriptionSession extends Session {
/////////////////////////////// topic ///////////////////////////////
+ /**
+ * Creates a topic with the specified name.
+ *
+ * <p>If the topic name contains single quotes, it must be enclosed in
backticks (`). For example,
+ * to create a topic named 'topic', the value passed in as topicName should
be `'topic'`
+ *
+ * @param If the created topic name contains single quotes, the passed
parameter needs to be
+ * enclosed in backticks.
+ * @throws IoTDBConnectionException If there is an issue with the connection
to IoTDB.
+ * @throws StatementExecutionException If there is an issue executing the
SQL statement.
+ */
public void createTopic(final String topicName)
throws IoTDBConnectionException, StatementExecutionException {
final String sql = String.format("CREATE TOPIC %s", topicName);
executeNonQueryStatement(sql);
}
+ /**
+ * Creates a topic with the specified name only if it does not already exist.
+ *
+ * <p>This method is similar to {@link #createTopic(String)}, but includes
the 'IF NOT EXISTS'
+ * condition. If the topic name contains single quotes, it must be enclosed
in backticks (`).
+ *
+ * @param If the created topic name contains single quotes, the passed
parameter needs to be
+ * enclosed in backticks.
+ * @throws IoTDBConnectionException If there is an issue with the connection
to IoTDB.
+ * @throws StatementExecutionException If there is an issue executing the
SQL statement.
+ */
+ public void createTopicIfNotExists(final String topicName)
+ throws IoTDBConnectionException, StatementExecutionException {
+ final String sql = String.format("CREATE TOPIC IF NOT EXISTS %s",
topicName);
+ executeNonQueryStatement(sql);
+ }
+
+ /**
+ * Creates a topic with the specified name and properties.
+ *
+ * <p>Topic names with single quotes must be enclosed in backticks (`).
Property keys and values
+ * are included in the SQL statement automatically.
+ *
+ * @param topicName If the created topic name contains single quotes, the
passed parameter needs
+ * to be enclosed in backticks.
+ * @param properties A {@link Properties} object containing the topic's
properties.
+ * @throws IoTDBConnectionException If a connection issue occurs with IoTDB.
+ * @throws StatementExecutionException If a statement execution issue occurs.
+ */
public void createTopic(final String topicName, final Properties properties)
throws IoTDBConnectionException, StatementExecutionException {
+ createTopic(topicName, properties, false);
+ }
+
+ /**
+ * Creates a topic with the specified properties if it does not already
exist. Topic names with
+ * single quotes must be enclosed in backticks (`).
+ *
+ * @param topicName If the created topic name contains single quotes, the
passed parameter needs
+ * to be enclosed in backticks.
+ * @param properties A {@link Properties} object containing the topic's
properties.
+ * @throws IoTDBConnectionException If a connection issue occurs.
+ * @throws StatementExecutionException If the SQL statement execution fails.
+ */
+ public void createTopicIfNotExists(final String topicName, final Properties
properties)
+ throws IoTDBConnectionException, StatementExecutionException {
+ createTopic(topicName, properties, true);
+ }
+
+ private void createTopic(
+ final String topicName, final Properties properties, final boolean
isSetIfNotExistsCondition)
+ throws IoTDBConnectionException, StatementExecutionException {
if (properties.isEmpty()) {
createTopic(topicName);
return;
@@ -106,16 +167,47 @@ public class SubscriptionSession extends Session {
.append(','));
sb.deleteCharAt(sb.length() - 1);
sb.append(')');
- final String sql = String.format("CREATE TOPIC %s WITH %s", topicName, sb);
+ final String sql =
+ isSetIfNotExistsCondition
+ ? String.format("CREATE TOPIC IF NOT EXISTS %s WITH %s",
topicName, sb)
+ : String.format("CREATE TOPIC %s WITH %s", topicName, sb);
executeNonQueryStatement(sql);
}
+ /**
+ * Drops the specified topic.
+ *
+ * <p>This method removes the specified topic from the database. If the
topic name contains single
+ * quotes, it must be enclosed in backticks (`).
+ *
+ * @param topicName The name of the topic to be deleted, if it contains
single quotes, needs to be
+ * enclosed in backticks.
+ * @throws IoTDBConnectionException If there is an issue with the connection
to IoTDB.
+ * @throws StatementExecutionException If there is an issue executing the
SQL statement.
+ */
public void dropTopic(final String topicName)
throws IoTDBConnectionException, StatementExecutionException {
final String sql = String.format("DROP TOPIC %s", topicName);
executeNonQueryStatement(sql);
}
+ /**
+ * Drops the specified topic if it exists.
+ *
+ * <p>This method is similar to {@link #dropTopic(String)}, but includes the
'IF EXISTS'
+ * condition. If the topic name contains single quotes, it must be enclosed
in backticks (`).
+ *
+ * @param topicName The name of the topic to be deleted, if it contains
single quotes, needs to be
+ * enclosed in backticks.
+ * @throws IoTDBConnectionException If there is an issue with the connection
to IoTDB.
+ * @throws StatementExecutionException If there is an issue executing the
SQL statement.
+ */
+ public void dropTopicIfExists(final String topicName)
+ throws IoTDBConnectionException, StatementExecutionException {
+ final String sql = String.format("DROP TOPIC IF EXISTS %s", topicName);
+ executeNonQueryStatement(sql);
+ }
+
public Set<Topic> getTopics() throws IoTDBConnectionException,
StatementExecutionException {
final String sql = "SHOW TOPICS";
try (final SessionDataSet dataSet = executeQueryStatement(sql)) {