This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch mergemaster0808 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 02657b7b0dc134c81ddadce530a5b0a2a2171ecf Author: Zhenyu Luo <[email protected]> AuthorDate: Mon Aug 5 12:38:16 2024 +0800 Subscription: java client supports Create Topic If Not Exists and Drop Topic If Exists (#13081) (cherry picked from commit 86ddd749cc00d6e80d7b4421e603c24945dcc18a) --- .../it/local/IoTDBSubscriptionTopicIT.java | 178 +++++++++++++++++++++ .../session/subscription/SubscriptionSession.java | 94 ++++++++++- 2 files changed, 271 insertions(+), 1 deletion(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionTopicIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionTopicIT.java new file mode 100644 index 00000000000..55f618b83f4 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionTopicIT.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.subscription.it.local; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; +import org.apache.iotdb.session.subscription.SubscriptionSession; +import org.apache.iotdb.session.subscription.model.Topic; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.Optional; +import java.util.Properties; + +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class}) +public class IoTDBSubscriptionTopicIT extends AbstractSubscriptionLocalIT { + + @Test + public void testBasicCreateTopic() throws Exception { + final String host = EnvFactory.getEnv().getIP(); + final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); + + try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + // create topic + String topicName = "topic1"; + session.createTopic(topicName); + Assert.assertTrue(session.getTopic(topicName).isPresent()); + Assert.assertEquals(topicName, session.getTopic(topicName).get().getTopicName()); + + // create topic + topicName = "topic2"; + Properties properties = new Properties(); + properties.put("path", "root.**"); + properties.put("start-time", "2023-01-01"); + properties.put("end-time", "2023-12-31"); + properties.put("format", "TsFileHandler"); + session.createTopic(topicName, properties); + Optional<Topic> topic = session.getTopic(topicName); + Assert.assertTrue(topic.isPresent()); + Assert.assertEquals(topicName, topic.get().getTopicName()); + // verify topic parameters + Assert.assertTrue(topic.get().getTopicAttributes().contains("path=root.**")); + Assert.assertTrue(topic.get().getTopicAttributes().contains("start-time=2023-01-01")); + Assert.assertTrue(topic.get().getTopicAttributes().contains("end-time=2023-12-31")); + Assert.assertTrue(topic.get().getTopicAttributes().contains("format=TsFileHandler")); + + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testBasicCreateTopicIfNotExists() throws Exception { + final String host = EnvFactory.getEnv().getIP(); + final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); + + try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + // create topic if not exits + String topicName = "topic3"; + session.createTopicIfNotExists(topicName); + Optional<Topic> topic = session.getTopic(topicName); + Assert.assertTrue(topic.isPresent()); + Assert.assertEquals(topicName, topic.get().getTopicName()); + + // create topic if not exits + session.createTopicIfNotExists(topicName); + topic = session.getTopic(topicName); + Assert.assertTrue(topic.isPresent()); + Assert.assertEquals(topicName, topic.get().getTopicName()); + + // create topic if not exits + topicName = "topic4"; + Properties properties = new Properties(); + properties.put("path", "root.**"); + properties.put("start-time", "2023-01-01"); + properties.put("end-time", "2023-12-31"); + properties.put("format", "TsFileHandler"); + session.createTopicIfNotExists(topicName, properties); + topic = session.getTopic(topicName); + Assert.assertTrue(topic.isPresent()); + Assert.assertEquals(topicName, topic.get().getTopicName()); + // verify topic parameters + Assert.assertTrue(topic.get().getTopicAttributes().contains("path=root.**")); + Assert.assertTrue(topic.get().getTopicAttributes().contains("start-time=2023-01-01")); + Assert.assertTrue(topic.get().getTopicAttributes().contains("end-time=2023-12-31")); + Assert.assertTrue(topic.get().getTopicAttributes().contains("format=TsFileHandler")); + + // create topic if not exits + properties.put("start-time", "2023-01-02"); + session.createTopicIfNotExists(topicName, properties); + topic = session.getTopic(topicName); + Assert.assertTrue(topic.isPresent()); + Assert.assertEquals(topicName, topic.get().getTopicName()); + // Verify Topic Parameters + Assert.assertTrue(topic.get().getTopicAttributes().contains("path=root.**")); + Assert.assertTrue(topic.get().getTopicAttributes().contains("start-time=2023-01-01")); + Assert.assertFalse(topic.get().getTopicAttributes().contains("start-time=2023-01-02")); + Assert.assertTrue(topic.get().getTopicAttributes().contains("end-time=2023-12-31")); + Assert.assertTrue(topic.get().getTopicAttributes().contains("format=TsFileHandler")); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testBasicDropTopic() throws Exception { + final String host = EnvFactory.getEnv().getIP(); + final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); + + try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + // create topic + String topicName = "topic5"; + session.createTopic(topicName); + + // drop topic + session.dropTopic(topicName); + Assert.assertFalse(session.getTopic(topicName).isPresent()); + + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testBasicDropTopicIfExists() throws Exception { + final String host = EnvFactory.getEnv().getIP(); + final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); + + try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + // create topic + String topicName = "topic6"; + session.createTopic(topicName); + + // drop topic if exists + session.dropTopicIfExists(topicName); + Assert.assertFalse(session.getTopic(topicName).isPresent()); + + // drop topic if exists + session.dropTopicIfExists(topicName); + Assert.assertFalse(session.getTopic(topicName).isPresent()); + + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java index b1a3dde0e7e..64fd083b152 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java @@ -87,14 +87,75 @@ public class SubscriptionSession extends Session { /////////////////////////////// topic /////////////////////////////// + /** + * Creates a topic with the specified name. + * + * <p>If the topic name contains single quotes, it must be enclosed in backticks (`). For example, + * to create a topic named 'topic', the value passed in as topicName should be `'topic'` + * + * @param If the created topic name contains single quotes, the passed parameter needs to be + * enclosed in backticks. + * @throws IoTDBConnectionException If there is an issue with the connection to IoTDB. + * @throws StatementExecutionException If there is an issue executing the SQL statement. + */ public void createTopic(final String topicName) throws IoTDBConnectionException, StatementExecutionException { final String sql = String.format("CREATE TOPIC %s", topicName); executeNonQueryStatement(sql); } + /** + * Creates a topic with the specified name only if it does not already exist. + * + * <p>This method is similar to {@link #createTopic(String)}, but includes the 'IF NOT EXISTS' + * condition. If the topic name contains single quotes, it must be enclosed in backticks (`). + * + * @param If the created topic name contains single quotes, the passed parameter needs to be + * enclosed in backticks. + * @throws IoTDBConnectionException If there is an issue with the connection to IoTDB. + * @throws StatementExecutionException If there is an issue executing the SQL statement. + */ + public void createTopicIfNotExists(final String topicName) + throws IoTDBConnectionException, StatementExecutionException { + final String sql = String.format("CREATE TOPIC IF NOT EXISTS %s", topicName); + executeNonQueryStatement(sql); + } + + /** + * Creates a topic with the specified name and properties. + * + * <p>Topic names with single quotes must be enclosed in backticks (`). Property keys and values + * are included in the SQL statement automatically. + * + * @param topicName If the created topic name contains single quotes, the passed parameter needs + * to be enclosed in backticks. + * @param properties A {@link Properties} object containing the topic's properties. + * @throws IoTDBConnectionException If a connection issue occurs with IoTDB. + * @throws StatementExecutionException If a statement execution issue occurs. + */ public void createTopic(final String topicName, final Properties properties) throws IoTDBConnectionException, StatementExecutionException { + createTopic(topicName, properties, false); + } + + /** + * Creates a topic with the specified properties if it does not already exist. Topic names with + * single quotes must be enclosed in backticks (`). + * + * @param topicName If the created topic name contains single quotes, the passed parameter needs + * to be enclosed in backticks. + * @param properties A {@link Properties} object containing the topic's properties. + * @throws IoTDBConnectionException If a connection issue occurs. + * @throws StatementExecutionException If the SQL statement execution fails. + */ + public void createTopicIfNotExists(final String topicName, final Properties properties) + throws IoTDBConnectionException, StatementExecutionException { + createTopic(topicName, properties, true); + } + + private void createTopic( + final String topicName, final Properties properties, final boolean isSetIfNotExistsCondition) + throws IoTDBConnectionException, StatementExecutionException { if (properties.isEmpty()) { createTopic(topicName); return; @@ -113,16 +174,47 @@ public class SubscriptionSession extends Session { .append(',')); sb.deleteCharAt(sb.length() - 1); sb.append(')'); - final String sql = String.format("CREATE TOPIC %s WITH %s", topicName, sb); + final String sql = + isSetIfNotExistsCondition + ? String.format("CREATE TOPIC IF NOT EXISTS %s WITH %s", topicName, sb) + : String.format("CREATE TOPIC %s WITH %s", topicName, sb); executeNonQueryStatement(sql); } + /** + * Drops the specified topic. + * + * <p>This method removes the specified topic from the database. If the topic name contains single + * quotes, it must be enclosed in backticks (`). + * + * @param topicName The name of the topic to be deleted, if it contains single quotes, needs to be + * enclosed in backticks. + * @throws IoTDBConnectionException If there is an issue with the connection to IoTDB. + * @throws StatementExecutionException If there is an issue executing the SQL statement. + */ public void dropTopic(final String topicName) throws IoTDBConnectionException, StatementExecutionException { final String sql = String.format("DROP TOPIC %s", topicName); executeNonQueryStatement(sql); } + /** + * Drops the specified topic if it exists. + * + * <p>This method is similar to {@link #dropTopic(String)}, but includes the 'IF EXISTS' + * condition. If the topic name contains single quotes, it must be enclosed in backticks (`). + * + * @param topicName The name of the topic to be deleted, if it contains single quotes, needs to be + * enclosed in backticks. + * @throws IoTDBConnectionException If there is an issue with the connection to IoTDB. + * @throws StatementExecutionException If there is an issue executing the SQL statement. + */ + public void dropTopicIfExists(final String topicName) + throws IoTDBConnectionException, StatementExecutionException { + final String sql = String.format("DROP TOPIC IF EXISTS %s", topicName); + executeNonQueryStatement(sql); + } + public Set<Topic> getTopics() throws IoTDBConnectionException, StatementExecutionException { final String sql = "SHOW TOPICS"; try (final SessionDataSet dataSet = executeQueryStatement(sql)) {
