This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 50a8910fab [To rel/0.13] Fix IoTDBArchivingIT (#7860)
50a8910fab is described below
commit 50a8910fab46fb86329d68237cc0b5d5048e1c1b
Author: Alan Choo <[email protected]>
AuthorDate: Wed Nov 2 18:59:01 2022 +0800
[To rel/0.13] Fix IoTDBArchivingIT (#7860)
---
.../iotdb/db/integration/IoTDBArchivingIT.java | 100 +++++++++++++++------
1 file changed, 71 insertions(+), 29 deletions(-)
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBArchivingIT.java
b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBArchivingIT.java
index fad6102139..d8d5a44d33 100644
---
a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBArchivingIT.java
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBArchivingIT.java
@@ -67,9 +67,8 @@ public class IoTDBArchivingIT {
@Test
@Category({ClusterTest.class})
- public void testArchiving() throws SQLException, InterruptedException {
+ public void testArchive2NonexistentSG() throws SQLException {
StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(ARCHIVING_CHECK_TIME);
-
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
try {
@@ -93,19 +92,19 @@ public class IoTDBArchivingIT {
} catch (SQLException e) {
assertEquals(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(),
e.getErrorCode());
}
+ }
+ }
+ @Test
+ @Category({ClusterTest.class})
+ public void testArchiveDataInRange() throws SQLException,
InterruptedException {
+
StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(ARCHIVING_CHECK_TIME);
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
statement.execute("SET STORAGE GROUP TO root.ARCHIVING_SG1");
statement.execute(
"CREATE TIMESERIES root.ARCHIVING_SG1.s1 WITH DATATYPE=INT32,
ENCODING=PLAIN");
-
- try {
- statement.execute("SET ARCHIVING TO storage_group=root.ARCHIVING_SG1");
- } catch (SQLException e) {
- assertEquals(TSStatusCode.METADATA_ERROR.getStatusCode(),
e.getErrorCode());
- }
-
- // test set when ttl is in range
-
+ // prepare data
long now = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
statement.execute(
@@ -113,7 +112,6 @@ public class IoTDBArchivingIT {
"INSERT INTO root.ARCHIVING_SG1(timestamp, s1) VALUES (%d,
%d)",
now - 100000 + i, i));
}
-
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM
root.ARCHIVING_SG1")) {
int cnt = 0;
while (resultSet.next()) {
@@ -121,8 +119,7 @@ public class IoTDBArchivingIT {
}
assertEquals(100, cnt);
}
-
- // test set when ttl isn't in range
+ // archive data
StorageEngine.getInstance().syncCloseAllProcessor();
statement.execute(
@@ -137,16 +134,26 @@ public class IoTDBArchivingIT {
}
assertEquals(0, cnt);
}
+ }
+ }
- // test pause archive
-
+ @Test
+ @Category({ClusterTest.class})
+ public void testArchiveDataNotInRange() throws SQLException,
InterruptedException {
+
StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(ARCHIVING_CHECK_TIME);
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("SET STORAGE GROUP TO root.ARCHIVING_SG1");
+ statement.execute(
+ "CREATE TIMESERIES root.ARCHIVING_SG1.s1 WITH DATATYPE=INT32,
ENCODING=PLAIN");
+ // prepare data
+ long now = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
statement.execute(
String.format(
"INSERT INTO root.ARCHIVING_SG1(timestamp, s1) VALUES (%d,
%d)",
now - 5000 + i, i));
}
-
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM
root.ARCHIVING_SG1")) {
int cnt = 0;
while (resultSet.next()) {
@@ -154,7 +161,7 @@ public class IoTDBArchivingIT {
}
assertEquals(100, cnt);
}
-
+ // archive data
StorageEngine.getInstance().syncCloseAllProcessor();
statement.execute(
@@ -171,7 +178,34 @@ public class IoTDBArchivingIT {
}
assertEquals(100, cnt);
}
+ }
+ }
+ @Test
+ @Category({ClusterTest.class})
+ public void testPauseAndResumeArchiving() throws SQLException,
InterruptedException {
+
StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(ARCHIVING_CHECK_TIME);
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("SET STORAGE GROUP TO root.ARCHIVING_SG1");
+ statement.execute(
+ "CREATE TIMESERIES root.ARCHIVING_SG1.s1 WITH DATATYPE=INT32,
ENCODING=PLAIN");
+ // prepare data
+ long now = System.currentTimeMillis();
+ for (int i = 0; i < 100; i++) {
+ statement.execute(
+ String.format(
+ "INSERT INTO root.ARCHIVING_SG1(timestamp, s1) VALUES (%d,
%d)",
+ now - 5000 + i, i));
+ }
+ try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM
root.ARCHIVING_SG1")) {
+ int cnt = 0;
+ while (resultSet.next()) {
+ cnt++;
+ }
+ assertEquals(100, cnt);
+ }
+ // pause data archiving
StorageEngine.getInstance().syncCloseAllProcessor();
StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(Long.MAX_VALUE);
@@ -182,7 +216,7 @@ public class IoTDBArchivingIT {
StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(ARCHIVING_CHECK_TIME);
- waitUntilAllFinished();
+ Thread.sleep(ARCHIVING_CHECK_TIME * 2);
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM
root.ARCHIVING_SG1")) {
int cnt = 0;
@@ -192,9 +226,7 @@ public class IoTDBArchivingIT {
assertEquals(100, cnt);
}
- StorageEngine.getInstance().syncCloseAllProcessor();
-
- // test resume archive
+ // resume data archiving
statement.execute("RESUME ARCHIVING ON root.ARCHIVING_SG1");
waitUntilAllFinished();
@@ -206,16 +238,26 @@ public class IoTDBArchivingIT {
}
assertEquals(0, cnt);
}
+ }
+ }
- // test cancel archive
-
+ @Test
+ @Category({ClusterTest.class})
+ public void testCancelArchiving() throws SQLException, InterruptedException {
+
StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(ARCHIVING_CHECK_TIME);
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("SET STORAGE GROUP TO root.ARCHIVING_SG1");
+ statement.execute(
+ "CREATE TIMESERIES root.ARCHIVING_SG1.s1 WITH DATATYPE=INT32,
ENCODING=PLAIN");
+ // prepare data
+ long now = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
statement.execute(
String.format(
"INSERT INTO root.ARCHIVING_SG1(timestamp, s1) VALUES (%d,
%d)",
now - 5000 + i, i));
}
-
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM
root.ARCHIVING_SG1")) {
int cnt = 0;
while (resultSet.next()) {
@@ -223,7 +265,7 @@ public class IoTDBArchivingIT {
}
assertEquals(100, cnt);
}
-
+ // test cancel archive
StorageEngine.getInstance().syncCloseAllProcessor();
StorageEngine.getInstance().getArchivingManager().setCheckThreadTime(Long.MAX_VALUE);
@@ -247,10 +289,10 @@ public class IoTDBArchivingIT {
}
private void waitUntilAllFinished() throws InterruptedException {
- int cnt = 10;
for (ArchivingTask task :
ArchivingManager.getInstance().getArchivingTasks()) {
- if (task.getStatus() != ArchivingTask.ArchivingTaskStatus.FINISHED) {
- Thread.sleep(ARCHIVING_CHECK_TIME * 2);
+ int cnt = 0;
+ while (task.isActive()) {
+ Thread.sleep(ARCHIVING_CHECK_TIME * 10);
cnt++;
if (cnt >= 50) {
throw new RuntimeException("Wait too long for all archiving task
finished.");