This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch IOTDB-4619 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c8f61e87a0277dcc1786199241c0e1f96e5a05e2 Author: JackieTien97 <[email protected]> AuthorDate: Thu Oct 20 20:13:08 2022 +0800 Add IT for CQ Management and Execute & change into syntax --- docs/UserGuide/Process-Data/Continuous-Query.md | 32 +- docs/zh/UserGuide/Process-Data/Continuous-Query.md | 18 +- .../org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java | 459 ++++++++++++++++++ .../java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java | 520 +++++++++++++++++++++ 4 files changed, 1004 insertions(+), 25 deletions(-) diff --git a/docs/UserGuide/Process-Data/Continuous-Query.md b/docs/UserGuide/Process-Data/Continuous-Query.md index 3d0fe550c9..b7f727bb0c 100644 --- a/docs/UserGuide/Process-Data/Continuous-Query.md +++ b/docs/UserGuide/Process-Data/Continuous-Query.md @@ -121,11 +121,11 @@ Use an `EVERY` interval in the `RESAMPLE` clause to specify the CQ’s execution ```sql CREATE CONTINUOUS QUERY cq1 RESAMPLE EVERY 20s -BEGIN - SELECT max_value(temperature) - INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max - FROM root.ln.*.* - GROUP BY time(10s) +BEGIN +SELECT max_value(temperature) + INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max) + FROM root.ln.*.* + GROUP BY time(10s) END ``` @@ -180,7 +180,7 @@ CREATE CONTINUOUS QUERY cq2 RESAMPLE RANGE 40s BEGIN SELECT max_value(temperature) - INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max + INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max) FROM root.ln.*.* GROUP BY time(10s) END @@ -255,7 +255,7 @@ CREATE CONTINUOUS QUERY cq3 RESAMPLE EVERY 20s RANGE 40s BEGIN SELECT max_value(temperature) - INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max + INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max) FROM root.ln.*.* GROUP BY time(10s) FILL(100.0) @@ -317,11 +317,11 @@ Use an `EVERY` interval and `RANGE` interval in the RESAMPLE clause to specify t ```sql CREATE CONTINUOUS QUERY cq4 RESAMPLE EVERY 20s RANGE 40s, 20s -BEGIN - SELECT max_value(temperature) - INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max - FROM root.ln.*.* - GROUP BY time(10s) +BEGIN + SELECT max_value(temperature) + INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max) + FROM root.ln.*.* + GROUP BY time(10s) FILL(100.0) END ``` @@ -376,10 +376,10 @@ Use an `EVERY` interval in the `RESAMPLE` clause to specify the CQ’s execution ```sql CREATE CONTINUOUS QUERY cq5 RESAMPLE EVERY 20s -BEGIN +BEGIN SELECT temperature + 1 - INTO root.precalculated_sg.::(temperature) - FROM root.ln.*.* + INTO root.precalculated_sg.::(temperature) + FROM root.ln.*.* align by device END ``` @@ -557,7 +557,7 @@ This step performs the nested sub query in from clause of the query above. The f CREATE CQ s1_count_cq BEGIN SELECT count(s1) - INTO root.sg_count.d.count_s1 + INTO root.sg_count.d(count_s1) FROM root.sg.d GROUP BY(30m) END diff --git a/docs/zh/UserGuide/Process-Data/Continuous-Query.md b/docs/zh/UserGuide/Process-Data/Continuous-Query.md index 0d22a04576..4527776535 100644 --- a/docs/zh/UserGuide/Process-Data/Continuous-Query.md +++ b/docs/zh/UserGuide/Process-Data/Continuous-Query.md @@ -122,11 +122,11 @@ END ```sql CREATE CONTINUOUS QUERY cq1 RESAMPLE EVERY 20s -BEGIN - SELECT max_value(temperature) - INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max - FROM root.ln.*.* - GROUP BY time(10s) +BEGIN + SELECT max_value(temperature) + INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max) + FROM root.ln.*.* + GROUP BY time(10s) END ``` @@ -182,7 +182,7 @@ CREATE CONTINUOUS QUERY cq2 RESAMPLE RANGE 40s BEGIN SELECT max_value(temperature) - INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max + INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max) FROM root.ln.*.* GROUP BY time(10s) END @@ -257,7 +257,7 @@ CREATE CONTINUOUS QUERY cq3 RESAMPLE EVERY 20s RANGE 40s BEGIN SELECT max_value(temperature) - INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max + INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max) FROM root.ln.*.* GROUP BY time(10s) FILL(100.0) @@ -322,7 +322,7 @@ CREATE CONTINUOUS QUERY cq4 RESAMPLE EVERY 20s RANGE 40s, 20s BEGIN SELECT max_value(temperature) - INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max + INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max) FROM root.ln.*.* GROUP BY time(10s) FILL(100.0) @@ -381,7 +381,7 @@ CREATE CONTINUOUS QUERY cq5 RESAMPLE EVERY 20s BEGIN SELECT temperature + 1 - INTO root.precalculated_sg.::(temperature) + INTO root.precalculated_sg.::(temperature) FROM root.ln.*.* align by device END diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java new file mode 100644 index 0000000000..83bb55c157 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java @@ -0,0 +1,459 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.it.cq; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category(ClusterIT.class) +@Ignore +public class IoTDBCQExecIT { + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().initBeforeClass(); + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanAfterClass(); + } + + @Test + public void testCQExecution1() { + String insertTemplate = + "INSERT INTO root.sg.d1(time, s1) VALUES (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d)"; + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + long now = System.currentTimeMillis(); + long firstExecutionTime = now + 10_000; + long startTime = firstExecutionTime - 3_000; + + statement.execute("create timeseries root.sg.d1.s1 WITH DATATYPE=INT64"); + statement.execute("create timeseries root.sg.d1.s1_max WITH DATATYPE=INT64"); + + statement.execute( + String.format( + insertTemplate, + startTime, + 1, + startTime + 500, + 2, + startTime + 1_000, + 3, + startTime + 1_500, + 4, + startTime + 2_000, + 5, + startTime + 2_500, + 6, + startTime + 3_000, + 7, + startTime + 3_500, + 8, + startTime + 4_000, + 9, + startTime + 4_500, + 10)); + + statement.execute( + "CREATE CONTINUOUS QUERY cq1\n" + + "RESAMPLE EVERY 2s\n" + + "BBOUNDARY " + + firstExecutionTime + + "BEGIN \n" + + " SELECT max_value(s1) \n" + + " INTO root.sg.d1(s1_max)\n" + + " FROM root.sg.d1\n" + + " GROUP BY time(1s) \n" + + "END"); + + long targetTime = firstExecutionTime + 5_000; + + while (System.currentTimeMillis() - targetTime < 0) { + TimeUnit.SECONDS.sleep(1); + } + + long[] expectedTime = { + startTime + 1_000, startTime + 2_000, startTime + 3_000, startTime + 4_000 + }; + long[] expectedValue = {4, 6, 8, 10}; + + try (ResultSet resultSet = statement.executeQuery("select s1_max from root.sg.d1")) { + int cnt = 0; + while (resultSet.next()) { + assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR)); + assertEquals(expectedValue[cnt], resultSet.getLong("root.sg.d1.s1_max")); + cnt++; + } + assertEquals(expectedTime.length, cnt); + } + + statement.execute("DROP CQ cq1"); + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCQExecution2() { + String insertTemplate = + "INSERT INTO root.sg.d2(time, s1) VALUES (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d)"; + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + long now = System.currentTimeMillis(); + long firstExecutionTime = now + 10_000; + long startTime = firstExecutionTime - 4_000; + + statement.execute("create timeseries root.sg.d2.s1 WITH DATATYPE=INT64"); + statement.execute("create timeseries root.sg.d2.s1_max WITH DATATYPE=INT64"); + + statement.execute( + String.format( + insertTemplate, + startTime, + 1, + startTime + 500, + 2, + startTime + 1_000, + 3, + startTime + 1_500, + 4, + startTime + 2_000, + 5, + startTime + 2_500, + 6, + startTime + 3_000, + 7, + startTime + 3_500, + 8, + startTime + 4_000, + 9, + startTime + 4_500, + 10)); + + statement.execute( + "CREATE CONTINUOUS QUERY cq2\n" + + "RESAMPLE RANGE 4s\n" + + "BBOUNDARY " + + firstExecutionTime + + "BEGIN \n" + + " SELECT max_value(s1) \n" + + " INTO root.sg.d2(s1_max)\n" + + " FROM root.sg.d2\n" + + " GROUP BY time(1s) \n" + + "END"); + + long targetTime = firstExecutionTime + 5_000; + + while (System.currentTimeMillis() - targetTime < 0) { + TimeUnit.SECONDS.sleep(1); + } + + long[] expectedTime = { + startTime, startTime + 1_000, startTime + 2_000, startTime + 3_000, startTime + 4_000 + }; + long[] expectedValue = {2, 4, 6, 8, 10}; + + try (ResultSet resultSet = statement.executeQuery("select s1_max from root.sg.d2")) { + int cnt = 0; + while (resultSet.next()) { + assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR)); + assertEquals(expectedValue[cnt], resultSet.getLong("root.sg.d2.s1_max")); + cnt++; + } + assertEquals(expectedTime.length, cnt); + } + + statement.execute("DROP CQ cq2"); + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCQExecution3() { + String insertTemplate = + "INSERT INTO root.sg.d3(time, s1) VALUES (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d)"; + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + long now = System.currentTimeMillis(); + long firstExecutionTime = now + 10_000; + long startTime = firstExecutionTime - 4_000; + + statement.execute("create timeseries root.sg.d3.s1 WITH DATATYPE=INT64"); + statement.execute("create timeseries root.sg.d3.s1_max WITH DATATYPE=INT64"); + + statement.execute( + String.format( + insertTemplate, + startTime, + 1, + startTime + 500, + 2, + startTime + 1_000, + 3, + startTime + 1_500, + 4, + startTime + 2_000, + 5, + startTime + 2_500, + 6, + startTime + 3_000, + 7, + startTime + 3_500, + 8, + startTime + 4_000, + 9, + startTime + 4_500, + 10)); + + statement.execute( + "CREATE CONTINUOUS QUERY cq3\n" + + "RESAMPLE EVERY 20s RANGE 40s\n" + + "BBOUNDARY " + + firstExecutionTime + + "BEGIN \n" + + " SELECT max_value(s1) \n" + + " INTO root.sg.d3(s1_max)\n" + + " FROM root.sg.d3\n" + + " GROUP BY time(1s) \n" + + " FILL(100)\n" + + "END"); + + long targetTime = firstExecutionTime + 5_000; + + while (System.currentTimeMillis() - targetTime < 0) { + TimeUnit.SECONDS.sleep(1); + } + + long[] expectedTime = { + startTime - 1_000, + startTime, + startTime + 1_000, + startTime + 2_000, + startTime + 3_000, + startTime + 4_000 + }; + long[] expectedValue = {100, 2, 4, 6, 8, 10}; + + try (ResultSet resultSet = statement.executeQuery("select s1_max from root.sg.d3")) { + int cnt = 0; + while (resultSet.next()) { + assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR)); + assertEquals(expectedValue[cnt], resultSet.getLong("root.sg.d3.s1_max")); + cnt++; + } + assertEquals(expectedTime.length, cnt); + } + + statement.execute("DROP CQ cq3"); + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCQExecution4() { + String insertTemplate = + "INSERT INTO root.sg.d4(time, s1) VALUES (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d)"; + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + long now = System.currentTimeMillis(); + long firstExecutionTime = now + 10_000; + long startTime = firstExecutionTime - 4_000; + + statement.execute("create timeseries root.sg.d4.s1 WITH DATATYPE=INT64"); + statement.execute("create timeseries root.sg.d4.s1_max WITH DATATYPE=INT64"); + + statement.execute( + String.format( + insertTemplate, + startTime, + 1, + startTime + 500, + 2, + startTime + 1_000, + 3, + startTime + 1_500, + 4, + startTime + 2_000, + 5, + startTime + 2_500, + 6, + startTime + 3_000, + 7, + startTime + 3_500, + 8, + startTime + 4_000, + 9, + startTime + 4_500, + 10)); + + statement.execute( + "CREATE CONTINUOUS QUERY cq4\n" + + "RESAMPLE EVERY 20s RANGE 20s, 10s\n" + + "BBOUNDARY " + + firstExecutionTime + + "BEGIN \n" + + " SELECT max_value(s1) \n" + + " INTO root.sg.d4(s1_max)\n" + + " FROM root.sg.d4\n" + + " GROUP BY time(1s) \n" + + "END"); + + long targetTime = firstExecutionTime + 5_000; + + while (System.currentTimeMillis() - targetTime < 0) { + TimeUnit.SECONDS.sleep(1); + } + + long[] expectedTime = {startTime + 2_000, startTime + 4_000}; + long[] expectedValue = {6, 10}; + + try (ResultSet resultSet = statement.executeQuery("select s1_max from root.sg.d4")) { + int cnt = 0; + while (resultSet.next()) { + assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR)); + assertEquals(expectedValue[cnt], resultSet.getLong("root.sg.d4.s1_max")); + cnt++; + } + assertEquals(expectedTime.length, cnt); + } + + statement.execute("DROP CQ cq4"); + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCQExecution5() { + String insertTemplate = + "INSERT INTO root.sg.d5(time, s1) VALUES (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d)"; + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + long now = System.currentTimeMillis(); + long firstExecutionTime = now + 10_000; + long startTime = firstExecutionTime - 4_000; + + statement.execute("create timeseries root.sg.d5.s1 WITH DATATYPE=INT64"); + statement.execute("create timeseries root.sg.d5.precalculated_s1 WITH DATATYPE=INT64"); + + statement.execute( + String.format( + insertTemplate, + startTime, + 1, + startTime + 500, + 2, + startTime + 1_000, + 3, + startTime + 1_500, + 4, + startTime + 2_000, + 5, + startTime + 2_500, + 6, + startTime + 3_000, + 7, + startTime + 3_500, + 8, + startTime + 4_000, + 9, + startTime + 4_500, + 10)); + + statement.execute( + "CREATE CONTINUOUS QUERY cq5\n" + + "RESAMPLE EVERY 2s\n" + + "BBOUNDARY " + + firstExecutionTime + + "BEGIN \n" + + " SELECT s1 + 1 \n" + + " INTO root.sg.d5(precalculated_s1)\n" + + " FROM root.sg.d5\n" + + " align by device\n" + + "END"); + + long targetTime = firstExecutionTime + 5_000; + + while (System.currentTimeMillis() - targetTime < 0) { + TimeUnit.SECONDS.sleep(1); + } + + long[] expectedTime = { + startTime, + startTime + 500, + startTime + 1_000, + startTime + 1_500, + startTime + 2_000, + startTime + 2_500, + startTime + 3_000, + startTime + 3_500, + startTime + 4_000, + startTime + 4_500 + }; + long[] expectedValue = {2, 3, 4, 5, 6, 7, 8, 9, 10, 11}; + + try (ResultSet resultSet = + statement.executeQuery("select precalculated_s1 from root.sg.d5")) { + int cnt = 0; + while (resultSet.next()) { + assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR)); + assertEquals(expectedValue[cnt], resultSet.getLong("root.sg.d5.precalculated_s1")); + cnt++; + } + assertEquals(expectedTime.length, cnt); + } + + statement.execute("DROP CQ cq5"); + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java new file mode 100644 index 0000000000..49a4853190 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java @@ -0,0 +1,520 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.it.cq; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category(ClusterIT.class) +@Ignore +public class IoTDBCQIT { + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().initBeforeClass(); + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanAfterClass(); + } + + // =======================================create cq====================================== + @Test + public void testCreateWrongCQ() { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + + // 1. specify first parameter of group by time + try { + String sql = + "CREATE CQ s1_count_cq \n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY([0, 10), 30m)\n" + + "END"; + statement.execute(sql); + fail(); + } catch (Exception e) { + // TODO add assert for error message + } + + // 2. specify time filter in where clause + try { + String sql = + "CREATE CQ s1_count_cq \n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " WHERE time >= 0 and time <= 10\n" + + " GROUP BY(30m)\n" + + "END"; + statement.execute(sql); + fail(); + } catch (Exception e) { + // TODO add assert for error message + } + + // 3. no every clause meanwhile no group by time + try { + String sql = + "CREATE CQ s1_count_cq \n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + "END"; + statement.execute(sql); + fail(); + } catch (Exception e) { + // TODO add assert for error message + } + + // 4. no INTO clause + try { + String sql = + "CREATE CQ s1_count_cq_2\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " FROM root.sg.d\n" + + " GROUP BY(30m)\n" + + "END"; + statement.execute(sql); + fail(); + } catch (Exception e) { + // TODO add assert for error message + } + + // 5. EVERY interval is less than continuous_query_min_every_interval_in_ms in + // iotdb-confignode.properties + try { + String sql = + "CREATE CQ s1_count_cq_2\n" + + "RESAMPLE EVERY 50ms\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(30m)\n" + + "END"; + statement.execute(sql); + fail(); + } catch (Exception e) { + // TODO add assert for error message + } + + // 6. start_time_offset < 0 + try { + String sql = + "CREATE CQ s1_count_cq_2\n" + + "RESAMPLE RANGE -1m\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(30m)\n" + + "END"; + statement.execute(sql); + fail(); + } catch (Exception e) { + // TODO add assert for error message + } + + // 7. start_time_offset == 0 + try { + String sql = + "CREATE CQ s1_count_cq_2\n" + + "RESAMPLE RANGE 0m\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(30m)\n" + + "END"; + statement.execute(sql); + fail(); + } catch (Exception e) { + // TODO add assert for error message + } + + // 8. end_time_offset < 0 + try { + String sql = + "CREATE CQ s1_count_cq_2\n" + + "RESAMPLE RANGE 30m, -1m\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(30m)\n" + + "END"; + statement.execute(sql); + fail(); + } catch (Exception e) { + // TODO add assert for error message + } + + // 9. end_time_offset == start_time_offset + try { + String sql = + "CREATE CQ s1_count_cq_2\n" + + "RESAMPLE RANGE 30m, 30m\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(30m)\n" + + "END"; + statement.execute(sql); + fail(); + } catch (Exception e) { + // TODO add assert for error message + } + + // 10. end_time_offset > start_time_offset + try { + String sql = + "CREATE CQ s1_count_cq_2\n" + + "RESAMPLE RANGE 30m, 31m\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(30m)\n" + + "END"; + statement.execute(sql); + fail(); + } catch (Exception e) { + // TODO add assert for error message + } + + // 11. group_by_interval > start_time_offset + try { + String sql = + "CREATE CQ s1_count_cq_2\n" + + "RESAMPLE RANGE 30m\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(1h)\n" + + "END"; + statement.execute(sql); + fail(); + } catch (Exception e) { + // TODO add assert for error message + } + + // 12. TIMEOUT POLICY is not BLOCKED or DISCARD + try { + String sql = + "CREATE CQ s1_count_cq_2\n" + + "RESAMPLE RANGE 30m\n" + + "TIMEOUT POLICY UNKNOWN\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(1h)\n" + + "END"; + statement.execute(sql); + fail(); + } catch (Exception e) { + // TODO add assert for error message + } + + // 13. create duplicated cq + try { + String sql = + "CREATE CQ s1_count_cq \n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(30m)\n" + + "END"; + statement.execute(sql); + statement.execute(sql); + fail(); + } catch (Exception e) { + // TODO add assert for error message + } + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCreateCorrectCQ() { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + + try { + String sql = + "CREATE CQ correct_cq_1 \n" + + "RESAMPLE \n" + + " EVERY 30m\n" + + " BOUNDARY 0\n" + + " RANGE 30m, 10m\n" + + "TIMEOUT POLICY BLOCKED\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(30m)\n" + + "END"; + statement.execute(sql); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try { + String sql = + "CREATE CQ correct_cq_2\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(30m)\n" + + "END"; + statement.execute(sql); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try { + String sql = + "CREATE CQ correct_cq_3\n" + + "RESAMPLE RANGE 30m, 0m\n" + + "TIMEOUT POLICY DISCARD\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(10m)\n" + + "END"; + statement.execute(sql); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try { + String sql = + "CREATE CQ s1_count_cq_correct\n" + + "RESAMPLE EVERY 30m \n" + + "TIMEOUT POLICY DISCARD\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(10m)\n" + + "END"; + statement.execute(sql); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // =======================================show cq====================================== + @Test + public void testShowCQ() { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + + String[] cqIds = {"show_cq_1", "show_cq_2", "show_cq_3", "show_cq_4"}; + String[] cqSQLs = { + "CREATE CQ show_cq_1 \n" + + "RESAMPLE \n" + + " EVERY 30m\n" + + " BOUNDARY 0\n" + + " RANGE 30m, 10m\n" + + "TIMEOUT POLICY BLOCKED\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(30m)\n" + + "END", + "CREATE CQ show_cq_2\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(30m)\n" + + "END", + "CREATE CQ show_cq_3\n" + + "RESAMPLE RANGE 30m, 0m\n" + + "TIMEOUT POLICY DISCARD\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(10m)\n" + + "END", + "CREATE CQ show_cq_4\n" + + "RESAMPLE EVERY 30m \n" + + "TIMEOUT POLICY DISCARD\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(10m)\n" + + "END" + }; + + for (String sql : cqSQLs) { + statement.execute(sql); + } + + try (ResultSet resultSet = statement.executeQuery("show CQS")) { + + int cnt = 0; + while (resultSet.next()) { + // No need to add time column for aggregation query + assertEquals(cqIds[cnt], resultSet.getString(0)); + assertEquals(cqSQLs[cnt], resultSet.getString(1)); + assertEquals("ACTIVE", resultSet.getString(2)); + cnt++; + } + assertEquals(cqIds.length, cnt); + } + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // =======================================drop cq====================================== + @Test + public void testDropCQ() { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + + String[] cqIds = {"drop_cq_1", "drop_cq_2", "drop_cq_3", "drop_cq_4"}; + String[] cqSQLs = { + "CREATE CQ drop_cq_1 \n" + + "RESAMPLE \n" + + " EVERY 30m\n" + + " BOUNDARY 0\n" + + " RANGE 30m, 10m\n" + + "TIMEOUT POLICY BLOCKED\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(30m)\n" + + "END", + "CREATE CQ drop_cq_2\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(30m)\n" + + "END", + "CREATE CQ drop_cq_3\n" + + "RESAMPLE RANGE 30m, 0m\n" + + "TIMEOUT POLICY DISCARD\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(10m)\n" + + "END", + "CREATE CQ drop_cq_4\n" + + "RESAMPLE EVERY 30m \n" + + "TIMEOUT POLICY DISCARD\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(10m)\n" + + "END" + }; + + for (String sql : cqSQLs) { + statement.execute(sql); + } + + try (ResultSet resultSet = statement.executeQuery("show CQS")) { + + int cnt = 0; + while (resultSet.next()) { + // No need to add time column for aggregation query + assertEquals(cqIds[cnt], resultSet.getString(0)); + assertEquals(cqSQLs[cnt], resultSet.getString(1)); + assertEquals("ACTIVE", resultSet.getString(2)); + cnt++; + } + assertEquals(cqIds.length, cnt); + } + + statement.execute("DROP CQ drop_cq_2"); + statement.execute("DROP CQ drop_cq_3"); + + int[] resultIndex = {0, 3}; + + try (ResultSet resultSet = statement.executeQuery("show CQS")) { + + int cnt = 0; + while (resultSet.next()) { + // No need to add time column for aggregation query + assertEquals(cqIds[resultIndex[cnt]], resultSet.getString(0)); + assertEquals(cqSQLs[resultIndex[cnt]], resultSet.getString(1)); + assertEquals("ACTIVE", resultSet.getString(2)); + cnt++; + } + assertEquals(resultIndex.length, cnt); + } + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +}
