This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch force_ci/alter_column_datatype
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/force_ci/alter_column_datatype 
by this push:
     new 7eff3955112 add concurrent test
7eff3955112 is described below

commit 7eff3955112046c3934dc14c03e12e51dfa72bea
Author: Tian Jiang <[email protected]>
AuthorDate: Mon Jan 13 18:55:56 2025 +0800

    add concurrent test
---
 .../it/env/cluster/config/MppDataNodeConfig.java   |   8 +
 .../it/env/remote/config/RemoteDataNodeConfig.java |   5 +
 .../apache/iotdb/itbase/env/DataNodeConfig.java    |   2 +
 .../it/schema/IoTDBAlterColumnTypeIT.java          | 168 +++++++++++++++++++++
 4 files changed, 183 insertions(+)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 0f22e0d4286..e9966616e11 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -76,4 +76,12 @@ public class MppDataNodeConfig extends MppBaseConfig 
implements DataNodeConfig {
         String.valueOf(loadTsFileAnalyzeSchemaMemorySizeInBytes));
     return this;
   }
+
+  @Override
+  public DataNodeConfig setCompactionScheduleIntervalInMs(int 
compactionScheduleIntervalInMs) {
+    properties.setProperty(
+        "compaction_schedule_interval_in_ms",
+        String.valueOf(compactionScheduleIntervalInMs));
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index fe89997bc41..fd35be91261 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -43,4 +43,9 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
       long loadTsFileAnalyzeSchemaMemorySizeInBytes) {
     return this;
   }
+
+  @Override
+  public DataNodeConfig setCompactionScheduleIntervalInMs(int 
compactionScheduleIntervalInMs) {
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index 2887b0a9871..54bcff14341 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -31,4 +31,6 @@ public interface DataNodeConfig {
 
   DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
       long loadTsFileAnalyzeSchemaMemorySizeInBytes);
+
+  DataNodeConfig setCompactionScheduleIntervalInMs(int 
compactionScheduleIntervalInMs);
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
index 0d4b0202d80..e794d225000 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
@@ -19,6 +19,10 @@
 
 package org.apache.iotdb.relational.it.schema;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iotdb.commons.utils.MetadataUtils;
 import org.apache.iotdb.isession.ITableSession;
 import org.apache.iotdb.isession.SessionDataSet;
@@ -47,6 +51,7 @@ import static 
org.apache.iotdb.relational.it.session.IoTDBSessionRelationalIT.ge
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
@@ -55,6 +60,7 @@ public class IoTDBAlterColumnTypeIT {
 
   @BeforeClass
   public static void setUp() throws Exception {
+    
EnvFactory.getEnv().getConfig().getDataNodeConfig().setCompactionScheduleIntervalInMs(1000);
     EnvFactory.getEnv().initClusterEnvironment();
     try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnection()) {
       session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS test");
@@ -326,6 +332,7 @@ public class IoTDBAlterColumnTypeIT {
     try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
       session.executeNonQueryStatement("CREATE TABLE IF NOT EXISTS 
drop_and_alter (s1 int32)");
 
+      // time=1 and time=2 are INT32 and deleted by drop column
       Tablet tablet =
           new Tablet(
               "drop_and_alter",
@@ -352,6 +359,7 @@ public class IoTDBAlterColumnTypeIT {
 
       session.executeNonQueryStatement("ALTER TABLE drop_and_alter DROP COLUMN 
s1");
 
+      // time=3 and time=4 are STRING
       tablet =
           new Tablet(
               "drop_and_alter",
@@ -379,6 +387,7 @@ public class IoTDBAlterColumnTypeIT {
       session.executeNonQueryStatement(
           "ALTER TABLE drop_and_alter ALTER COLUMN s1 SET DATA TYPE TEXT");
 
+      // time=5 and time=6 are TEXT
       tablet =
           new Tablet(
               "drop_and_alter",
@@ -405,6 +414,7 @@ public class IoTDBAlterColumnTypeIT {
 
       SessionDataSet dataSet =
           session.executeQueryStatement("select * from drop_and_alter order by 
time");
+      // s1 is dropped but the time should remain
       RowRecord rec;
       for (int i = 1; i < 3; i++) {
         rec = dataSet.next();
@@ -419,4 +429,162 @@ public class IoTDBAlterColumnTypeIT {
       assertFalse(dataSet.hasNext());
     }
   }
+
+  @Test
+  public void testContinuousAlter() throws IoTDBConnectionException, 
StatementExecutionException {
+    try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
+      session.executeNonQueryStatement("CREATE TABLE IF NOT EXISTS 
alter_and_alter (s1 int32)");
+
+      // time=1 and time=2 are INT32
+      Tablet tablet =
+          new Tablet(
+              "alter_and_alter",
+              Collections.singletonList("s1"),
+              Collections.singletonList(TSDataType.INT32),
+              Collections.singletonList(ColumnCategory.FIELD));
+      tablet.addTimestamp(0, 1);
+      tablet.addValue("s1", 0, genValue(TSDataType.INT32, 1));
+      session.insert(tablet);
+      tablet.reset();
+
+      session.executeNonQueryStatement("FLUSH");
+
+      tablet =
+          new Tablet(
+              "alter_and_alter",
+              Collections.singletonList("s1"),
+              Collections.singletonList(TSDataType.INT32),
+              Collections.singletonList(ColumnCategory.FIELD));
+      tablet.addTimestamp(0, 2);
+      tablet.addValue("s1", 0, genValue(TSDataType.INT32, 2));
+      session.insert(tablet);
+      tablet.reset();
+
+      // time=3 and time=4 are FLOAT
+      session.executeNonQueryStatement("ALTER TABLE alter_and_alter ALTER 
COLUMN s1 SET DATA TYPE FLOAT");
+      tablet =
+          new Tablet(
+              "alter_and_alter",
+              Collections.singletonList("s1"),
+              Collections.singletonList(TSDataType.FLOAT),
+              Collections.singletonList(ColumnCategory.FIELD));
+      tablet.addTimestamp(0, 3);
+      tablet.addValue("s1", 0, genValue(TSDataType.FLOAT, 3));
+      session.insert(tablet);
+      tablet.reset();
+
+      session.executeNonQueryStatement("FLUSH");
+
+      tablet =
+          new Tablet(
+              "alter_and_alter",
+              Collections.singletonList("s1"),
+              Collections.singletonList(TSDataType.FLOAT),
+              Collections.singletonList(ColumnCategory.FIELD));
+      tablet.addTimestamp(0, 4);
+      tablet.addValue("s1", 0, genValue(TSDataType.FLOAT, 4));
+      session.insert(tablet);
+      tablet.reset();
+
+
+      // time=5 and time=6 are DOUBLE
+      session.executeNonQueryStatement(
+          "ALTER TABLE alter_and_alter ALTER COLUMN s1 SET DATA TYPE DOUBLE");
+      tablet =
+          new Tablet(
+              "alter_and_alter",
+              Collections.singletonList("s1"),
+              Collections.singletonList(TSDataType.DOUBLE),
+              Collections.singletonList(ColumnCategory.FIELD));
+      tablet.addTimestamp(0, 5);
+      tablet.addValue("s1", 0, genValue(TSDataType.DOUBLE, 5));
+      session.insert(tablet);
+      tablet.reset();
+
+      session.executeNonQueryStatement("FLUSH");
+
+      tablet =
+          new Tablet(
+              "alter_and_alter",
+              Collections.singletonList("s1"),
+              Collections.singletonList(TSDataType.DOUBLE),
+              Collections.singletonList(ColumnCategory.FIELD));
+      tablet.addTimestamp(0, 6);
+      tablet.addValue("s1", 0, genValue(TSDataType.DOUBLE, 6));
+      session.insert(tablet);
+      tablet.reset();
+
+      SessionDataSet dataSet =
+          session.executeQueryStatement("select * from alter_and_alter order 
by time");
+      RowRecord rec;
+      for (int i = 1; i < 7; i++) {
+        rec = dataSet.next();
+        assertEquals(i, rec.getFields().get(0).getLongV());
+        assertEquals(genValue(TSDataType.DOUBLE, i).toString(), 
rec.getFields().get(1).toString());
+      }
+      assertFalse(dataSet.hasNext());
+    }
+  }
+
+  @Test
+  public void testConcurrentWriteAndAlter()
+      throws IoTDBConnectionException, StatementExecutionException, 
InterruptedException {
+    try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
+      session.executeNonQueryStatement("CREATE TABLE IF NOT EXISTS 
concurrent_write_and_alter (s1 int32)");
+      // session.executeNonQueryStatement("SET CONFIGURATION 
enable_seq_space_compaction='false'");
+    }
+
+    ExecutorService threadPool = Executors.newCachedThreadPool();
+    AtomicInteger writeCounter = new AtomicInteger(0);
+    int maxWrite = 10000;
+    int flushInterval = 100;
+    int alterStart = 5000;
+    threadPool.submit(() -> {
+      try {
+        write(writeCounter, maxWrite, flushInterval);
+      } catch (IoTDBConnectionException | StatementExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    });
+    threadPool.submit(() -> {
+      try {
+        alter(writeCounter, alterStart);
+      } catch (InterruptedException | IoTDBConnectionException | 
StatementExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    });
+    threadPool.shutdown();
+    assertTrue(threadPool.awaitTermination(1, TimeUnit.MINUTES));
+
+    try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
+      SessionDataSet dataSet =
+          session.executeQueryStatement("select count(s1) from 
concurrent_write_and_alter");
+      RowRecord rec;
+      rec = dataSet.next();
+      assertEquals(maxWrite, rec.getFields().get(0).getLongV());
+      assertFalse(dataSet.hasNext());
+    }
+  }
+
+  private void write(AtomicInteger writeCounter, int maxWrite, int 
flushInterval)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
+      int writtenCnt = 0;
+      do {
+        session.executeNonQueryStatement(String.format("INSERT INTO 
concurrent_write_and_alter (time, s1) VALUES (%d, %d)", writtenCnt, 
writtenCnt));
+        if (((writtenCnt + 1) % flushInterval) == 0) {
+          session.executeNonQueryStatement("FLUSH");
+        }
+      } while ((writtenCnt = writeCounter.incrementAndGet()) < maxWrite);
+    }
+  }
+
+  private void alter(AtomicInteger writeCounter, int alterStart) throws 
InterruptedException, IoTDBConnectionException, StatementExecutionException {
+    while (writeCounter.get() < alterStart) {
+      Thread.sleep(10);
+    }
+    try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
+      session.executeNonQueryStatement("ALTER TABLE concurrent_write_and_alter 
ALTER COLUMN s1 SET DATA TYPE DOUBLE");
+    }
+  }
 }

Reply via email to