This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c8f61e87a0277dcc1786199241c0e1f96e5a05e2
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Oct 20 20:13:08 2022 +0800

    Add IT for CQ Management and Execute & change into syntax
---
 docs/UserGuide/Process-Data/Continuous-Query.md    |  32 +-
 docs/zh/UserGuide/Process-Data/Continuous-Query.md |  18 +-
 .../org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java   | 459 ++++++++++++++++++
 .../java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java  | 520 +++++++++++++++++++++
 4 files changed, 1004 insertions(+), 25 deletions(-)

diff --git a/docs/UserGuide/Process-Data/Continuous-Query.md 
b/docs/UserGuide/Process-Data/Continuous-Query.md
index 3d0fe550c9..b7f727bb0c 100644
--- a/docs/UserGuide/Process-Data/Continuous-Query.md
+++ b/docs/UserGuide/Process-Data/Continuous-Query.md
@@ -121,11 +121,11 @@ Use an `EVERY` interval in the `RESAMPLE` clause to 
specify the CQ’s execution
 ```sql
 CREATE CONTINUOUS QUERY cq1
 RESAMPLE EVERY 20s
-BEGIN 
-  SELECT max_value(temperature) 
-  INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, 
root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max
-  FROM root.ln.*.* 
-  GROUP BY time(10s) 
+BEGIN
+SELECT max_value(temperature)
+  INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), 
root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
+  FROM root.ln.*.*
+  GROUP BY time(10s)
 END
 ```
 
@@ -180,7 +180,7 @@ CREATE CONTINUOUS QUERY cq2
 RESAMPLE RANGE 40s
 BEGIN
   SELECT max_value(temperature)
-  INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, 
root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max
+  INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), 
root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
   FROM root.ln.*.*
   GROUP BY time(10s)
 END
@@ -255,7 +255,7 @@ CREATE CONTINUOUS QUERY cq3
 RESAMPLE EVERY 20s RANGE 40s
 BEGIN
   SELECT max_value(temperature)
-  INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, 
root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max
+  INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), 
root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
   FROM root.ln.*.*
   GROUP BY time(10s)
   FILL(100.0)
@@ -317,11 +317,11 @@ Use an `EVERY` interval and `RANGE` interval in the 
RESAMPLE clause to specify t
 ```sql
 CREATE CONTINUOUS QUERY cq4
 RESAMPLE EVERY 20s RANGE 40s, 20s
-BEGIN 
-  SELECT max_value(temperature) 
-  INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, 
root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max
-  FROM root.ln.*.* 
-  GROUP BY time(10s) 
+BEGIN
+  SELECT max_value(temperature)
+  INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), 
root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
+  FROM root.ln.*.*
+  GROUP BY time(10s)
   FILL(100.0)
 END
 ```
@@ -376,10 +376,10 @@ Use an `EVERY` interval in the `RESAMPLE` clause to 
specify the CQ’s execution
 ```sql
 CREATE CONTINUOUS QUERY cq5
 RESAMPLE EVERY 20s
-BEGIN 
+BEGIN
   SELECT temperature + 1
-  INTO  root.precalculated_sg.::(temperature)
-  FROM root.ln.*.* 
+  INTO root.precalculated_sg.::(temperature)
+  FROM root.ln.*.*
   align by device
 END
 ```
@@ -557,7 +557,7 @@ This step performs the nested sub query in from clause of 
the query above. The f
 CREATE CQ s1_count_cq 
 BEGIN 
     SELECT count(s1)  
-        INTO root.sg_count.d.count_s1
+        INTO root.sg_count.d(count_s1)
         FROM root.sg.d
         GROUP BY(30m)
 END
diff --git a/docs/zh/UserGuide/Process-Data/Continuous-Query.md 
b/docs/zh/UserGuide/Process-Data/Continuous-Query.md
index 0d22a04576..4527776535 100644
--- a/docs/zh/UserGuide/Process-Data/Continuous-Query.md
+++ b/docs/zh/UserGuide/Process-Data/Continuous-Query.md
@@ -122,11 +122,11 @@ END
 ```sql
 CREATE CONTINUOUS QUERY cq1
 RESAMPLE EVERY 20s
-BEGIN 
-  SELECT max_value(temperature) 
-  INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, 
root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max
-  FROM root.ln.*.* 
-  GROUP BY time(10s) 
+BEGIN
+  SELECT max_value(temperature)
+  INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), 
root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
+  FROM root.ln.*.*
+  GROUP BY time(10s)
 END
 ```
 
@@ -182,7 +182,7 @@ CREATE CONTINUOUS QUERY cq2
 RESAMPLE RANGE 40s
 BEGIN
   SELECT max_value(temperature)
-  INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, 
root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max
+  INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), 
root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
   FROM root.ln.*.*
   GROUP BY time(10s)
 END
@@ -257,7 +257,7 @@ CREATE CONTINUOUS QUERY cq3
 RESAMPLE EVERY 20s RANGE 40s
 BEGIN
   SELECT max_value(temperature)
-  INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, 
root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max
+  INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), 
root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
   FROM root.ln.*.*
   GROUP BY time(10s)
   FILL(100.0)
@@ -322,7 +322,7 @@ CREATE CONTINUOUS QUERY cq4
 RESAMPLE EVERY 20s RANGE 40s, 20s
 BEGIN
   SELECT max_value(temperature)
-  INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, 
root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max
+  INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), 
root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
   FROM root.ln.*.*
   GROUP BY time(10s)
   FILL(100.0)
@@ -381,7 +381,7 @@ CREATE CONTINUOUS QUERY cq5
 RESAMPLE EVERY 20s
 BEGIN
   SELECT temperature + 1
-  INTO  root.precalculated_sg.::(temperature)
+  INTO root.precalculated_sg.::(temperature)
   FROM root.ln.*.*
   align by device
 END
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java
new file mode 100644
index 0000000000..83bb55c157
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.it.cq;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category(ClusterIT.class)
+@Ignore
+public class IoTDBCQExecIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().initBeforeClass();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+  }
+
+  @Test
+  public void testCQExecution1() {
+    String insertTemplate =
+        "INSERT INTO root.sg.d1(time, s1) VALUES (%d, %d) (%d, %d) (%d, %d) 
(%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d)";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      long now = System.currentTimeMillis();
+      long firstExecutionTime = now + 10_000;
+      long startTime = firstExecutionTime - 3_000;
+
+      statement.execute("create timeseries root.sg.d1.s1 WITH DATATYPE=INT64");
+      statement.execute("create timeseries root.sg.d1.s1_max WITH 
DATATYPE=INT64");
+
+      statement.execute(
+          String.format(
+              insertTemplate,
+              startTime,
+              1,
+              startTime + 500,
+              2,
+              startTime + 1_000,
+              3,
+              startTime + 1_500,
+              4,
+              startTime + 2_000,
+              5,
+              startTime + 2_500,
+              6,
+              startTime + 3_000,
+              7,
+              startTime + 3_500,
+              8,
+              startTime + 4_000,
+              9,
+              startTime + 4_500,
+              10));
+
+      statement.execute(
+          "CREATE CONTINUOUS QUERY cq1\n"
+              + "RESAMPLE EVERY 2s\n"
+              + "BBOUNDARY "
+              + firstExecutionTime
+              + "BEGIN \n"
+              + "  SELECT max_value(s1) \n"
+              + "  INTO root.sg.d1(s1_max)\n"
+              + "  FROM root.sg.d1\n"
+              + "  GROUP BY time(1s) \n"
+              + "END");
+
+      long targetTime = firstExecutionTime + 5_000;
+
+      while (System.currentTimeMillis() - targetTime < 0) {
+        TimeUnit.SECONDS.sleep(1);
+      }
+
+      long[] expectedTime = {
+        startTime + 1_000, startTime + 2_000, startTime + 3_000, startTime + 
4_000
+      };
+      long[] expectedValue = {4, 6, 8, 10};
+
+      try (ResultSet resultSet = statement.executeQuery("select s1_max from 
root.sg.d1")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR));
+          assertEquals(expectedValue[cnt], 
resultSet.getLong("root.sg.d1.s1_max"));
+          cnt++;
+        }
+        assertEquals(expectedTime.length, cnt);
+      }
+
+      statement.execute("DROP CQ cq1");
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCQExecution2() {
+    String insertTemplate =
+        "INSERT INTO root.sg.d2(time, s1) VALUES (%d, %d) (%d, %d) (%d, %d) 
(%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d)";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      long now = System.currentTimeMillis();
+      long firstExecutionTime = now + 10_000;
+      long startTime = firstExecutionTime - 4_000;
+
+      statement.execute("create timeseries root.sg.d2.s1 WITH DATATYPE=INT64");
+      statement.execute("create timeseries root.sg.d2.s1_max WITH 
DATATYPE=INT64");
+
+      statement.execute(
+          String.format(
+              insertTemplate,
+              startTime,
+              1,
+              startTime + 500,
+              2,
+              startTime + 1_000,
+              3,
+              startTime + 1_500,
+              4,
+              startTime + 2_000,
+              5,
+              startTime + 2_500,
+              6,
+              startTime + 3_000,
+              7,
+              startTime + 3_500,
+              8,
+              startTime + 4_000,
+              9,
+              startTime + 4_500,
+              10));
+
+      statement.execute(
+          "CREATE CONTINUOUS QUERY cq2\n"
+              + "RESAMPLE RANGE 4s\n"
+              + "BBOUNDARY "
+              + firstExecutionTime
+              + "BEGIN \n"
+              + "  SELECT max_value(s1) \n"
+              + "  INTO root.sg.d2(s1_max)\n"
+              + "  FROM root.sg.d2\n"
+              + "  GROUP BY time(1s) \n"
+              + "END");
+
+      long targetTime = firstExecutionTime + 5_000;
+
+      while (System.currentTimeMillis() - targetTime < 0) {
+        TimeUnit.SECONDS.sleep(1);
+      }
+
+      long[] expectedTime = {
+        startTime, startTime + 1_000, startTime + 2_000, startTime + 3_000, 
startTime + 4_000
+      };
+      long[] expectedValue = {2, 4, 6, 8, 10};
+
+      try (ResultSet resultSet = statement.executeQuery("select s1_max from 
root.sg.d2")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR));
+          assertEquals(expectedValue[cnt], 
resultSet.getLong("root.sg.d2.s1_max"));
+          cnt++;
+        }
+        assertEquals(expectedTime.length, cnt);
+      }
+
+      statement.execute("DROP CQ cq2");
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCQExecution3() {
+    String insertTemplate =
+        "INSERT INTO root.sg.d3(time, s1) VALUES (%d, %d) (%d, %d) (%d, %d) 
(%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d)";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      long now = System.currentTimeMillis();
+      long firstExecutionTime = now + 10_000;
+      long startTime = firstExecutionTime - 4_000;
+
+      statement.execute("create timeseries root.sg.d3.s1 WITH DATATYPE=INT64");
+      statement.execute("create timeseries root.sg.d3.s1_max WITH 
DATATYPE=INT64");
+
+      statement.execute(
+          String.format(
+              insertTemplate,
+              startTime,
+              1,
+              startTime + 500,
+              2,
+              startTime + 1_000,
+              3,
+              startTime + 1_500,
+              4,
+              startTime + 2_000,
+              5,
+              startTime + 2_500,
+              6,
+              startTime + 3_000,
+              7,
+              startTime + 3_500,
+              8,
+              startTime + 4_000,
+              9,
+              startTime + 4_500,
+              10));
+
+      statement.execute(
+          "CREATE CONTINUOUS QUERY cq3\n"
+              + "RESAMPLE EVERY 20s RANGE 40s\n"
+              + "BBOUNDARY "
+              + firstExecutionTime
+              + "BEGIN \n"
+              + "  SELECT max_value(s1) \n"
+              + "  INTO root.sg.d3(s1_max)\n"
+              + "  FROM root.sg.d3\n"
+              + "  GROUP BY time(1s) \n"
+              + "  FILL(100)\n"
+              + "END");
+
+      long targetTime = firstExecutionTime + 5_000;
+
+      while (System.currentTimeMillis() - targetTime < 0) {
+        TimeUnit.SECONDS.sleep(1);
+      }
+
+      long[] expectedTime = {
+        startTime - 1_000,
+        startTime,
+        startTime + 1_000,
+        startTime + 2_000,
+        startTime + 3_000,
+        startTime + 4_000
+      };
+      long[] expectedValue = {100, 2, 4, 6, 8, 10};
+
+      try (ResultSet resultSet = statement.executeQuery("select s1_max from 
root.sg.d3")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR));
+          assertEquals(expectedValue[cnt], 
resultSet.getLong("root.sg.d3.s1_max"));
+          cnt++;
+        }
+        assertEquals(expectedTime.length, cnt);
+      }
+
+      statement.execute("DROP CQ cq3");
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCQExecution4() {
+    String insertTemplate =
+        "INSERT INTO root.sg.d4(time, s1) VALUES (%d, %d) (%d, %d) (%d, %d) 
(%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d)";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      long now = System.currentTimeMillis();
+      long firstExecutionTime = now + 10_000;
+      long startTime = firstExecutionTime - 4_000;
+
+      statement.execute("create timeseries root.sg.d4.s1 WITH DATATYPE=INT64");
+      statement.execute("create timeseries root.sg.d4.s1_max WITH 
DATATYPE=INT64");
+
+      statement.execute(
+          String.format(
+              insertTemplate,
+              startTime,
+              1,
+              startTime + 500,
+              2,
+              startTime + 1_000,
+              3,
+              startTime + 1_500,
+              4,
+              startTime + 2_000,
+              5,
+              startTime + 2_500,
+              6,
+              startTime + 3_000,
+              7,
+              startTime + 3_500,
+              8,
+              startTime + 4_000,
+              9,
+              startTime + 4_500,
+              10));
+
+      statement.execute(
+          "CREATE CONTINUOUS QUERY cq4\n"
+              + "RESAMPLE EVERY 20s RANGE 20s, 10s\n"
+              + "BBOUNDARY "
+              + firstExecutionTime
+              + "BEGIN \n"
+              + "  SELECT max_value(s1) \n"
+              + "  INTO root.sg.d4(s1_max)\n"
+              + "  FROM root.sg.d4\n"
+              + "  GROUP BY time(1s) \n"
+              + "END");
+
+      long targetTime = firstExecutionTime + 5_000;
+
+      while (System.currentTimeMillis() - targetTime < 0) {
+        TimeUnit.SECONDS.sleep(1);
+      }
+
+      long[] expectedTime = {startTime + 2_000, startTime + 4_000};
+      long[] expectedValue = {6, 10};
+
+      try (ResultSet resultSet = statement.executeQuery("select s1_max from 
root.sg.d4")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR));
+          assertEquals(expectedValue[cnt], 
resultSet.getLong("root.sg.d4.s1_max"));
+          cnt++;
+        }
+        assertEquals(expectedTime.length, cnt);
+      }
+
+      statement.execute("DROP CQ cq4");
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCQExecution5() {
+    String insertTemplate =
+        "INSERT INTO root.sg.d5(time, s1) VALUES (%d, %d) (%d, %d) (%d, %d) 
(%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d)";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      long now = System.currentTimeMillis();
+      long firstExecutionTime = now + 10_000;
+      long startTime = firstExecutionTime - 4_000;
+
+      statement.execute("create timeseries root.sg.d5.s1 WITH DATATYPE=INT64");
+      statement.execute("create timeseries root.sg.d5.precalculated_s1 WITH 
DATATYPE=INT64");
+
+      statement.execute(
+          String.format(
+              insertTemplate,
+              startTime,
+              1,
+              startTime + 500,
+              2,
+              startTime + 1_000,
+              3,
+              startTime + 1_500,
+              4,
+              startTime + 2_000,
+              5,
+              startTime + 2_500,
+              6,
+              startTime + 3_000,
+              7,
+              startTime + 3_500,
+              8,
+              startTime + 4_000,
+              9,
+              startTime + 4_500,
+              10));
+
+      statement.execute(
+          "CREATE CONTINUOUS QUERY cq5\n"
+              + "RESAMPLE EVERY 2s\n"
+              + "BBOUNDARY "
+              + firstExecutionTime
+              + "BEGIN \n"
+              + "  SELECT s1 + 1 \n"
+              + "  INTO root.sg.d5(precalculated_s1)\n"
+              + "  FROM root.sg.d5\n"
+              + "  align by device\n"
+              + "END");
+
+      long targetTime = firstExecutionTime + 5_000;
+
+      while (System.currentTimeMillis() - targetTime < 0) {
+        TimeUnit.SECONDS.sleep(1);
+      }
+
+      long[] expectedTime = {
+        startTime,
+        startTime + 500,
+        startTime + 1_000,
+        startTime + 1_500,
+        startTime + 2_000,
+        startTime + 2_500,
+        startTime + 3_000,
+        startTime + 3_500,
+        startTime + 4_000,
+        startTime + 4_500
+      };
+      long[] expectedValue = {2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
+
+      try (ResultSet resultSet =
+          statement.executeQuery("select precalculated_s1 from root.sg.d5")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR));
+          assertEquals(expectedValue[cnt], 
resultSet.getLong("root.sg.d5.precalculated_s1"));
+          cnt++;
+        }
+        assertEquals(expectedTime.length, cnt);
+      }
+
+      statement.execute("DROP CQ cq5");
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
new file mode 100644
index 0000000000..49a4853190
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.it.cq;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category(ClusterIT.class)
+@Ignore
+public class IoTDBCQIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().initBeforeClass();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+  }
+
+  // =======================================create 
cq======================================
+  @Test
+  public void testCreateWrongCQ() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      // 1. specify first parameter of group by time
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq \n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY([0, 10), 30m)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 2. specify time filter in where clause
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq \n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    WHERE time >= 0 and time <= 10\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 3. no every clause meanwhile no group by time
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq \n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 4. no INTO clause
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_2\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 5. EVERY interval is less than 
continuous_query_min_every_interval_in_ms in
+      // iotdb-confignode.properties
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_2\n"
+                + "RESAMPLE EVERY 50ms\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 6. start_time_offset < 0
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_2\n"
+                + "RESAMPLE RANGE -1m\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 7. start_time_offset == 0
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_2\n"
+                + "RESAMPLE RANGE 0m\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 8. end_time_offset < 0
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_2\n"
+                + "RESAMPLE RANGE 30m, -1m\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 9. end_time_offset == start_time_offset
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_2\n"
+                + "RESAMPLE RANGE 30m, 30m\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 10. end_time_offset > start_time_offset
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_2\n"
+                + "RESAMPLE RANGE 30m, 31m\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 11. group_by_interval > start_time_offset
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_2\n"
+                + "RESAMPLE RANGE 30m\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(1h)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 12. TIMEOUT POLICY is not BLOCKED or DISCARD
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_2\n"
+                + "RESAMPLE RANGE 30m\n"
+                + "TIMEOUT POLICY UNKNOWN\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(1h)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 13. create duplicated cq
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq \n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCreateCorrectCQ() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try {
+        String sql =
+            "CREATE CQ correct_cq_1 \n"
+                + "RESAMPLE \n"
+                + "  EVERY 30m\n"
+                + "  BOUNDARY 0\n"
+                + "  RANGE 30m, 10m\n"
+                + "TIMEOUT POLICY BLOCKED\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+
+      try {
+        String sql =
+            "CREATE CQ correct_cq_2\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+
+      try {
+        String sql =
+            "CREATE CQ correct_cq_3\n"
+                + "RESAMPLE RANGE 30m, 0m\n"
+                + "TIMEOUT POLICY DISCARD\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(10m)\n"
+                + "END";
+        statement.execute(sql);
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_correct\n"
+                + "RESAMPLE EVERY 30m \n"
+                + "TIMEOUT POLICY DISCARD\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(10m)\n"
+                + "END";
+        statement.execute(sql);
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  // =======================================show 
cq======================================
+  @Test
+  public void testShowCQ() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      String[] cqIds = {"show_cq_1", "show_cq_2", "show_cq_3", "show_cq_4"};
+      String[] cqSQLs = {
+        "CREATE CQ show_cq_1 \n"
+            + "RESAMPLE \n"
+            + "  EVERY 30m\n"
+            + "  BOUNDARY 0\n"
+            + "  RANGE 30m, 10m\n"
+            + "TIMEOUT POLICY BLOCKED\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(30m)\n"
+            + "END",
+        "CREATE CQ show_cq_2\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(30m)\n"
+            + "END",
+        "CREATE CQ show_cq_3\n"
+            + "RESAMPLE RANGE 30m, 0m\n"
+            + "TIMEOUT POLICY DISCARD\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(10m)\n"
+            + "END",
+        "CREATE CQ show_cq_4\n"
+            + "RESAMPLE EVERY 30m \n"
+            + "TIMEOUT POLICY DISCARD\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(10m)\n"
+            + "END"
+      };
+
+      for (String sql : cqSQLs) {
+        statement.execute(sql);
+      }
+
+      try (ResultSet resultSet = statement.executeQuery("show CQS")) {
+
+        int cnt = 0;
+        while (resultSet.next()) {
+          // No need to add time column for aggregation query
+          assertEquals(cqIds[cnt], resultSet.getString(0));
+          assertEquals(cqSQLs[cnt], resultSet.getString(1));
+          assertEquals("ACTIVE", resultSet.getString(2));
+          cnt++;
+        }
+        assertEquals(cqIds.length, cnt);
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  // =======================================drop 
cq======================================
+  @Test
+  public void testDropCQ() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      String[] cqIds = {"drop_cq_1", "drop_cq_2", "drop_cq_3", "drop_cq_4"};
+      String[] cqSQLs = {
+        "CREATE CQ drop_cq_1 \n"
+            + "RESAMPLE \n"
+            + "  EVERY 30m\n"
+            + "  BOUNDARY 0\n"
+            + "  RANGE 30m, 10m\n"
+            + "TIMEOUT POLICY BLOCKED\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(30m)\n"
+            + "END",
+        "CREATE CQ drop_cq_2\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(30m)\n"
+            + "END",
+        "CREATE CQ drop_cq_3\n"
+            + "RESAMPLE RANGE 30m, 0m\n"
+            + "TIMEOUT POLICY DISCARD\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(10m)\n"
+            + "END",
+        "CREATE CQ drop_cq_4\n"
+            + "RESAMPLE EVERY 30m \n"
+            + "TIMEOUT POLICY DISCARD\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(10m)\n"
+            + "END"
+      };
+
+      for (String sql : cqSQLs) {
+        statement.execute(sql);
+      }
+
+      try (ResultSet resultSet = statement.executeQuery("show CQS")) {
+
+        int cnt = 0;
+        while (resultSet.next()) {
+          // No need to add time column for aggregation query
+          assertEquals(cqIds[cnt], resultSet.getString(0));
+          assertEquals(cqSQLs[cnt], resultSet.getString(1));
+          assertEquals("ACTIVE", resultSet.getString(2));
+          cnt++;
+        }
+        assertEquals(cqIds.length, cnt);
+      }
+
+      statement.execute("DROP CQ drop_cq_2");
+      statement.execute("DROP CQ drop_cq_3");
+
+      int[] resultIndex = {0, 3};
+
+      try (ResultSet resultSet = statement.executeQuery("show CQS")) {
+
+        int cnt = 0;
+        while (resultSet.next()) {
+          // No need to add time column for aggregation query
+          assertEquals(cqIds[resultIndex[cnt]], resultSet.getString(0));
+          assertEquals(cqSQLs[resultIndex[cnt]], resultSet.getString(1));
+          assertEquals("ACTIVE", resultSet.getString(2));
+          cnt++;
+        }
+        assertEquals(resultIndex.length, cnt);
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}


Reply via email to