This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch force_ci/alter_column_datatype
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/force_ci/alter_column_datatype
by this push:
new 7eff3955112 add concurrent test
7eff3955112 is described below
commit 7eff3955112046c3934dc14c03e12e51dfa72bea
Author: Tian Jiang <[email protected]>
AuthorDate: Mon Jan 13 18:55:56 2025 +0800
add concurrent test
---
.../it/env/cluster/config/MppDataNodeConfig.java | 8 +
.../it/env/remote/config/RemoteDataNodeConfig.java | 5 +
.../apache/iotdb/itbase/env/DataNodeConfig.java | 2 +
.../it/schema/IoTDBAlterColumnTypeIT.java | 168 +++++++++++++++++++++
4 files changed, 183 insertions(+)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 0f22e0d4286..e9966616e11 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -76,4 +76,12 @@ public class MppDataNodeConfig extends MppBaseConfig
implements DataNodeConfig {
String.valueOf(loadTsFileAnalyzeSchemaMemorySizeInBytes));
return this;
}
+
+ @Override
+ public DataNodeConfig setCompactionScheduleIntervalInMs(int
compactionScheduleIntervalInMs) {
+ properties.setProperty(
+ "compaction_schedule_interval_in_ms",
+ String.valueOf(compactionScheduleIntervalInMs));
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index fe89997bc41..fd35be91261 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -43,4 +43,9 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
long loadTsFileAnalyzeSchemaMemorySizeInBytes) {
return this;
}
+
+ @Override
+ public DataNodeConfig setCompactionScheduleIntervalInMs(int
compactionScheduleIntervalInMs) {
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index 2887b0a9871..54bcff14341 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -31,4 +31,6 @@ public interface DataNodeConfig {
DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
long loadTsFileAnalyzeSchemaMemorySizeInBytes);
+
+ DataNodeConfig setCompactionScheduleIntervalInMs(int
compactionScheduleIntervalInMs);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
index 0d4b0202d80..e794d225000 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
@@ -19,6 +19,10 @@
package org.apache.iotdb.relational.it.schema;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.commons.utils.MetadataUtils;
import org.apache.iotdb.isession.ITableSession;
import org.apache.iotdb.isession.SessionDataSet;
@@ -47,6 +51,7 @@ import static
org.apache.iotdb.relational.it.session.IoTDBSessionRelationalIT.ge
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@@ -55,6 +60,7 @@ public class IoTDBAlterColumnTypeIT {
@BeforeClass
public static void setUp() throws Exception {
+
EnvFactory.getEnv().getConfig().getDataNodeConfig().setCompactionScheduleIntervalInMs(1000);
EnvFactory.getEnv().initClusterEnvironment();
try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS test");
@@ -326,6 +332,7 @@ public class IoTDBAlterColumnTypeIT {
try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
session.executeNonQueryStatement("CREATE TABLE IF NOT EXISTS
drop_and_alter (s1 int32)");
+ // time=1 and time=2 are INT32 and deleted by drop column
Tablet tablet =
new Tablet(
"drop_and_alter",
@@ -352,6 +359,7 @@ public class IoTDBAlterColumnTypeIT {
session.executeNonQueryStatement("ALTER TABLE drop_and_alter DROP COLUMN
s1");
+ // time=3 and time=4 are STRING
tablet =
new Tablet(
"drop_and_alter",
@@ -379,6 +387,7 @@ public class IoTDBAlterColumnTypeIT {
session.executeNonQueryStatement(
"ALTER TABLE drop_and_alter ALTER COLUMN s1 SET DATA TYPE TEXT");
+ // time=5 and time=6 are TEXT
tablet =
new Tablet(
"drop_and_alter",
@@ -405,6 +414,7 @@ public class IoTDBAlterColumnTypeIT {
SessionDataSet dataSet =
session.executeQueryStatement("select * from drop_and_alter order by
time");
+ // s1 is dropped but the time should remain
RowRecord rec;
for (int i = 1; i < 3; i++) {
rec = dataSet.next();
@@ -419,4 +429,162 @@ public class IoTDBAlterColumnTypeIT {
assertFalse(dataSet.hasNext());
}
}
+
+ @Test
+ public void testContinuousAlter() throws IoTDBConnectionException,
StatementExecutionException {
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
+ session.executeNonQueryStatement("CREATE TABLE IF NOT EXISTS
alter_and_alter (s1 int32)");
+
+ // time=1 and time=2 are INT32
+ Tablet tablet =
+ new Tablet(
+ "alter_and_alter",
+ Collections.singletonList("s1"),
+ Collections.singletonList(TSDataType.INT32),
+ Collections.singletonList(ColumnCategory.FIELD));
+ tablet.addTimestamp(0, 1);
+ tablet.addValue("s1", 0, genValue(TSDataType.INT32, 1));
+ session.insert(tablet);
+ tablet.reset();
+
+ session.executeNonQueryStatement("FLUSH");
+
+ tablet =
+ new Tablet(
+ "alter_and_alter",
+ Collections.singletonList("s1"),
+ Collections.singletonList(TSDataType.INT32),
+ Collections.singletonList(ColumnCategory.FIELD));
+ tablet.addTimestamp(0, 2);
+ tablet.addValue("s1", 0, genValue(TSDataType.INT32, 2));
+ session.insert(tablet);
+ tablet.reset();
+
+ // time=3 and time=4 are FLOAT
+ session.executeNonQueryStatement("ALTER TABLE alter_and_alter ALTER
COLUMN s1 SET DATA TYPE FLOAT");
+ tablet =
+ new Tablet(
+ "alter_and_alter",
+ Collections.singletonList("s1"),
+ Collections.singletonList(TSDataType.FLOAT),
+ Collections.singletonList(ColumnCategory.FIELD));
+ tablet.addTimestamp(0, 3);
+ tablet.addValue("s1", 0, genValue(TSDataType.FLOAT, 3));
+ session.insert(tablet);
+ tablet.reset();
+
+ session.executeNonQueryStatement("FLUSH");
+
+ tablet =
+ new Tablet(
+ "alter_and_alter",
+ Collections.singletonList("s1"),
+ Collections.singletonList(TSDataType.FLOAT),
+ Collections.singletonList(ColumnCategory.FIELD));
+ tablet.addTimestamp(0, 4);
+ tablet.addValue("s1", 0, genValue(TSDataType.FLOAT, 4));
+ session.insert(tablet);
+ tablet.reset();
+
+
+ // time=5 and time=6 are DOUBLE
+ session.executeNonQueryStatement(
+ "ALTER TABLE alter_and_alter ALTER COLUMN s1 SET DATA TYPE DOUBLE");
+ tablet =
+ new Tablet(
+ "alter_and_alter",
+ Collections.singletonList("s1"),
+ Collections.singletonList(TSDataType.DOUBLE),
+ Collections.singletonList(ColumnCategory.FIELD));
+ tablet.addTimestamp(0, 5);
+ tablet.addValue("s1", 0, genValue(TSDataType.DOUBLE, 5));
+ session.insert(tablet);
+ tablet.reset();
+
+ session.executeNonQueryStatement("FLUSH");
+
+ tablet =
+ new Tablet(
+ "alter_and_alter",
+ Collections.singletonList("s1"),
+ Collections.singletonList(TSDataType.DOUBLE),
+ Collections.singletonList(ColumnCategory.FIELD));
+ tablet.addTimestamp(0, 6);
+ tablet.addValue("s1", 0, genValue(TSDataType.DOUBLE, 6));
+ session.insert(tablet);
+ tablet.reset();
+
+ SessionDataSet dataSet =
+ session.executeQueryStatement("select * from alter_and_alter order
by time");
+ RowRecord rec;
+ for (int i = 1; i < 7; i++) {
+ rec = dataSet.next();
+ assertEquals(i, rec.getFields().get(0).getLongV());
+ assertEquals(genValue(TSDataType.DOUBLE, i).toString(),
rec.getFields().get(1).toString());
+ }
+ assertFalse(dataSet.hasNext());
+ }
+ }
+
+ @Test
+ public void testConcurrentWriteAndAlter()
+ throws IoTDBConnectionException, StatementExecutionException,
InterruptedException {
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
+ session.executeNonQueryStatement("CREATE TABLE IF NOT EXISTS
concurrent_write_and_alter (s1 int32)");
+ // session.executeNonQueryStatement("SET CONFIGURATION
enable_seq_space_compaction='false'");
+ }
+
+ ExecutorService threadPool = Executors.newCachedThreadPool();
+ AtomicInteger writeCounter = new AtomicInteger(0);
+ int maxWrite = 10000;
+ int flushInterval = 100;
+ int alterStart = 5000;
+ threadPool.submit(() -> {
+ try {
+ write(writeCounter, maxWrite, flushInterval);
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ threadPool.submit(() -> {
+ try {
+ alter(writeCounter, alterStart);
+ } catch (InterruptedException | IoTDBConnectionException |
StatementExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ threadPool.shutdown();
+ assertTrue(threadPool.awaitTermination(1, TimeUnit.MINUTES));
+
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
+ SessionDataSet dataSet =
+ session.executeQueryStatement("select count(s1) from
concurrent_write_and_alter");
+ RowRecord rec;
+ rec = dataSet.next();
+ assertEquals(maxWrite, rec.getFields().get(0).getLongV());
+ assertFalse(dataSet.hasNext());
+ }
+ }
+
+ private void write(AtomicInteger writeCounter, int maxWrite, int
flushInterval)
+ throws IoTDBConnectionException, StatementExecutionException {
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
+ int writtenCnt = 0;
+ do {
+ session.executeNonQueryStatement(String.format("INSERT INTO
concurrent_write_and_alter (time, s1) VALUES (%d, %d)", writtenCnt,
writtenCnt));
+ if (((writtenCnt + 1) % flushInterval) == 0) {
+ session.executeNonQueryStatement("FLUSH");
+ }
+ } while ((writtenCnt = writeCounter.incrementAndGet()) < maxWrite);
+ }
+ }
+
+ private void alter(AtomicInteger writeCounter, int alterStart) throws
InterruptedException, IoTDBConnectionException, StatementExecutionException {
+ while (writeCounter.get() < alterStart) {
+ Thread.sleep(10);
+ }
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
+ session.executeNonQueryStatement("ALTER TABLE concurrent_write_and_alter
ALTER COLUMN s1 SET DATA TYPE DOUBLE");
+ }
+ }
}