This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch mergemaster0808
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 02657b7b0dc134c81ddadce530a5b0a2a2171ecf
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Aug 5 12:38:16 2024 +0800

    Subscription: java client supports Create Topic If Not Exists and Drop 
Topic If Exists (#13081)
    
    (cherry picked from commit 86ddd749cc00d6e80d7b4421e603c24945dcc18a)
---
 .../it/local/IoTDBSubscriptionTopicIT.java         | 178 +++++++++++++++++++++
 .../session/subscription/SubscriptionSession.java  |  94 ++++++++++-
 2 files changed, 271 insertions(+), 1 deletion(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionTopicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionTopicIT.java
new file mode 100644
index 00000000000..55f618b83f4
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionTopicIT.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.local;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.model.Topic;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class})
+public class IoTDBSubscriptionTopicIT extends AbstractSubscriptionLocalIT {
+
+  @Test
+  public void testBasicCreateTopic() throws Exception {
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      // create topic
+      String topicName = "topic1";
+      session.createTopic(topicName);
+      Assert.assertTrue(session.getTopic(topicName).isPresent());
+      Assert.assertEquals(topicName, 
session.getTopic(topicName).get().getTopicName());
+
+      // create topic
+      topicName = "topic2";
+      Properties properties = new Properties();
+      properties.put("path", "root.**");
+      properties.put("start-time", "2023-01-01");
+      properties.put("end-time", "2023-12-31");
+      properties.put("format", "TsFileHandler");
+      session.createTopic(topicName, properties);
+      Optional<Topic> topic = session.getTopic(topicName);
+      Assert.assertTrue(topic.isPresent());
+      Assert.assertEquals(topicName, topic.get().getTopicName());
+      // verify topic parameters
+      
Assert.assertTrue(topic.get().getTopicAttributes().contains("path=root.**"));
+      
Assert.assertTrue(topic.get().getTopicAttributes().contains("start-time=2023-01-01"));
+      
Assert.assertTrue(topic.get().getTopicAttributes().contains("end-time=2023-12-31"));
+      
Assert.assertTrue(topic.get().getTopicAttributes().contains("format=TsFileHandler"));
+
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testBasicCreateTopicIfNotExists() throws Exception {
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      // create topic if not exits
+      String topicName = "topic3";
+      session.createTopicIfNotExists(topicName);
+      Optional<Topic> topic = session.getTopic(topicName);
+      Assert.assertTrue(topic.isPresent());
+      Assert.assertEquals(topicName, topic.get().getTopicName());
+
+      // create topic if not exits
+      session.createTopicIfNotExists(topicName);
+      topic = session.getTopic(topicName);
+      Assert.assertTrue(topic.isPresent());
+      Assert.assertEquals(topicName, topic.get().getTopicName());
+
+      // create topic if not exits
+      topicName = "topic4";
+      Properties properties = new Properties();
+      properties.put("path", "root.**");
+      properties.put("start-time", "2023-01-01");
+      properties.put("end-time", "2023-12-31");
+      properties.put("format", "TsFileHandler");
+      session.createTopicIfNotExists(topicName, properties);
+      topic = session.getTopic(topicName);
+      Assert.assertTrue(topic.isPresent());
+      Assert.assertEquals(topicName, topic.get().getTopicName());
+      // verify topic parameters
+      
Assert.assertTrue(topic.get().getTopicAttributes().contains("path=root.**"));
+      
Assert.assertTrue(topic.get().getTopicAttributes().contains("start-time=2023-01-01"));
+      
Assert.assertTrue(topic.get().getTopicAttributes().contains("end-time=2023-12-31"));
+      
Assert.assertTrue(topic.get().getTopicAttributes().contains("format=TsFileHandler"));
+
+      // create topic if not exits
+      properties.put("start-time", "2023-01-02");
+      session.createTopicIfNotExists(topicName, properties);
+      topic = session.getTopic(topicName);
+      Assert.assertTrue(topic.isPresent());
+      Assert.assertEquals(topicName, topic.get().getTopicName());
+      // Verify Topic Parameters
+      
Assert.assertTrue(topic.get().getTopicAttributes().contains("path=root.**"));
+      
Assert.assertTrue(topic.get().getTopicAttributes().contains("start-time=2023-01-01"));
+      
Assert.assertFalse(topic.get().getTopicAttributes().contains("start-time=2023-01-02"));
+      
Assert.assertTrue(topic.get().getTopicAttributes().contains("end-time=2023-12-31"));
+      
Assert.assertTrue(topic.get().getTopicAttributes().contains("format=TsFileHandler"));
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testBasicDropTopic() throws Exception {
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      // create topic
+      String topicName = "topic5";
+      session.createTopic(topicName);
+
+      // drop topic
+      session.dropTopic(topicName);
+      Assert.assertFalse(session.getTopic(topicName).isPresent());
+
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testBasicDropTopicIfExists() throws Exception {
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      // create topic
+      String topicName = "topic6";
+      session.createTopic(topicName);
+
+      // drop topic if exists
+      session.dropTopicIfExists(topicName);
+      Assert.assertFalse(session.getTopic(topicName).isPresent());
+
+      // drop topic if exists
+      session.dropTopicIfExists(topicName);
+      Assert.assertFalse(session.getTopic(topicName).isPresent());
+
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
index b1a3dde0e7e..64fd083b152 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
@@ -87,14 +87,75 @@ public class SubscriptionSession extends Session {
 
   /////////////////////////////// topic ///////////////////////////////
 
+  /**
+   * Creates a topic with the specified name.
+   *
+   * <p>If the topic name contains single quotes, it must be enclosed in 
backticks (`). For example,
+   * to create a topic named 'topic', the value passed in as topicName should 
be `'topic'`
+   *
+   * @param If the created topic name contains single quotes, the passed 
parameter needs to be
+   *     enclosed in backticks.
+   * @throws IoTDBConnectionException If there is an issue with the connection 
to IoTDB.
+   * @throws StatementExecutionException If there is an issue executing the 
SQL statement.
+   */
   public void createTopic(final String topicName)
       throws IoTDBConnectionException, StatementExecutionException {
     final String sql = String.format("CREATE TOPIC %s", topicName);
     executeNonQueryStatement(sql);
   }
 
+  /**
+   * Creates a topic with the specified name only if it does not already exist.
+   *
+   * <p>This method is similar to {@link #createTopic(String)}, but includes 
the 'IF NOT EXISTS'
+   * condition. If the topic name contains single quotes, it must be enclosed 
in backticks (`).
+   *
+   * @param If the created topic name contains single quotes, the passed 
parameter needs to be
+   *     enclosed in backticks.
+   * @throws IoTDBConnectionException If there is an issue with the connection 
to IoTDB.
+   * @throws StatementExecutionException If there is an issue executing the 
SQL statement.
+   */
+  public void createTopicIfNotExists(final String topicName)
+      throws IoTDBConnectionException, StatementExecutionException {
+    final String sql = String.format("CREATE TOPIC IF NOT EXISTS %s", 
topicName);
+    executeNonQueryStatement(sql);
+  }
+
+  /**
+   * Creates a topic with the specified name and properties.
+   *
+   * <p>Topic names with single quotes must be enclosed in backticks (`). 
Property keys and values
+   * are included in the SQL statement automatically.
+   *
+   * @param topicName If the created topic name contains single quotes, the 
passed parameter needs
+   *     to be enclosed in backticks.
+   * @param properties A {@link Properties} object containing the topic's 
properties.
+   * @throws IoTDBConnectionException If a connection issue occurs with IoTDB.
+   * @throws StatementExecutionException If a statement execution issue occurs.
+   */
   public void createTopic(final String topicName, final Properties properties)
       throws IoTDBConnectionException, StatementExecutionException {
+    createTopic(topicName, properties, false);
+  }
+
+  /**
+   * Creates a topic with the specified properties if it does not already 
exist. Topic names with
+   * single quotes must be enclosed in backticks (`).
+   *
+   * @param topicName If the created topic name contains single quotes, the 
passed parameter needs
+   *     to be enclosed in backticks.
+   * @param properties A {@link Properties} object containing the topic's 
properties.
+   * @throws IoTDBConnectionException If a connection issue occurs.
+   * @throws StatementExecutionException If the SQL statement execution fails.
+   */
+  public void createTopicIfNotExists(final String topicName, final Properties 
properties)
+      throws IoTDBConnectionException, StatementExecutionException {
+    createTopic(topicName, properties, true);
+  }
+
+  private void createTopic(
+      final String topicName, final Properties properties, final boolean 
isSetIfNotExistsCondition)
+      throws IoTDBConnectionException, StatementExecutionException {
     if (properties.isEmpty()) {
       createTopic(topicName);
       return;
@@ -113,16 +174,47 @@ public class SubscriptionSession extends Session {
                 .append(','));
     sb.deleteCharAt(sb.length() - 1);
     sb.append(')');
-    final String sql = String.format("CREATE TOPIC %s WITH %s", topicName, sb);
+    final String sql =
+        isSetIfNotExistsCondition
+            ? String.format("CREATE TOPIC IF NOT EXISTS %s WITH %s", 
topicName, sb)
+            : String.format("CREATE TOPIC %s WITH %s", topicName, sb);
     executeNonQueryStatement(sql);
   }
 
+  /**
+   * Drops the specified topic.
+   *
+   * <p>This method removes the specified topic from the database. If the 
topic name contains single
+   * quotes, it must be enclosed in backticks (`).
+   *
+   * @param topicName The name of the topic to be deleted, if it contains 
single quotes, needs to be
+   *     enclosed in backticks.
+   * @throws IoTDBConnectionException If there is an issue with the connection 
to IoTDB.
+   * @throws StatementExecutionException If there is an issue executing the 
SQL statement.
+   */
   public void dropTopic(final String topicName)
       throws IoTDBConnectionException, StatementExecutionException {
     final String sql = String.format("DROP TOPIC %s", topicName);
     executeNonQueryStatement(sql);
   }
 
+  /**
+   * Drops the specified topic if it exists.
+   *
+   * <p>This method is similar to {@link #dropTopic(String)}, but includes the 
'IF EXISTS'
+   * condition. If the topic name contains single quotes, it must be enclosed 
in backticks (`).
+   *
+   * @param topicName The name of the topic to be deleted, if it contains 
single quotes, needs to be
+   *     enclosed in backticks.
+   * @throws IoTDBConnectionException If there is an issue with the connection 
to IoTDB.
+   * @throws StatementExecutionException If there is an issue executing the 
SQL statement.
+   */
+  public void dropTopicIfExists(final String topicName)
+      throws IoTDBConnectionException, StatementExecutionException {
+    final String sql = String.format("DROP TOPIC IF EXISTS %s", topicName);
+    executeNonQueryStatement(sql);
+  }
+
   public Set<Topic> getTopics() throws IoTDBConnectionException, 
StatementExecutionException {
     final String sql = "SHOW TOPICS";
     try (final SessionDataSet dataSet = executeQueryStatement(sql)) {

Reply via email to