This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch force_ci/support_schema_evolution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7617259da036fd34f111896c22abb00395ac3193 Author: Tian Jiang <[email protected]> AuthorDate: Mon Jan 19 11:23:12 2026 +0800 add more tests --- integration-test/pom.xml | 2 +- .../iotdb/itbase/runtime/RequestDelegate.java | 7 +- .../iotdb/relational/it/schema/IoTDBTableIT.java | 444 +++++++++++++++++---- .../db/schemaengine/table/DataNodeTableCache.java | 8 +- .../db/storageengine/dataregion/DataRegion.java | 2 + 5 files changed, 390 insertions(+), 73 deletions(-) diff --git a/integration-test/pom.xml b/integration-test/pom.xml index f566c8e5995..647ad21d76b 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -417,7 +417,7 @@ <profile> <id>SimpleIT</id> <activation> - <activeByDefault>true</activeByDefault> + <activeByDefault>true</activeByDefault> </activation> <properties> <integrationTest.excludedGroups>org.apache.iotdb.itbase.category.ManualIT</integrationTest.excludedGroups> diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/RequestDelegate.java b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/RequestDelegate.java index 47f31004e6f..78e979d5c25 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/RequestDelegate.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/RequestDelegate.java @@ -126,6 +126,10 @@ public abstract class RequestDelegate<T> { break; } } + + if (!exceptionInconsistent && exceptionMsg[0] != null) { + throw new SQLException(exceptionMsg[0]); + } for (int i = 0; i < businessExceptions.length; i++) { if (businessExceptions[i] != null) { // As each exception has its own stacktrace, in order to display them clearly, we can only @@ -134,9 +138,6 @@ public abstract class RequestDelegate<T> { "Exception happens during request to {}", getEndpoints().get(i), businessExceptions[i]); } } - if (!exceptionInconsistent && exceptionMsg[0] != null) { - throw new SQLException(exceptionMsg[0]); - } if (exceptionInconsistent) { throw new InconsistentDataException(Arrays.asList(exceptionMsg), getEndpoints()); } diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java index 803bfb5d10d..c70342a8cfa 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java @@ -19,6 +19,7 @@ package org.apache.iotdb.relational.it.schema; +import java.util.stream.Collectors; import org.apache.iotdb.commons.utils.WindowsOSUtils; import org.apache.iotdb.db.it.utils.TestUtils; import org.apache.iotdb.isession.ITableSession; @@ -33,6 +34,8 @@ import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.external.commons.lang3.SystemUtils; +import org.apache.tsfile.file.metadata.ColumnSchema; +import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -1642,92 +1645,397 @@ public class IoTDBTableIT { } } - @Test - public void testAlterTableAndColumn_RenameTableThenColumn() throws Exception { - try (final Connection connection = - EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); - final Statement statement = connection.createStatement()) { - statement.execute("DROP DATABASE IF EXISTS testdb"); - statement.execute("CREATE DATABASE IF NOT EXISTS testdb"); - statement.execute("USE testdb"); + // Helper: recognize SQLExceptions that mean the target table/device cannot be found. + private static boolean isTableNotFound(final SQLException e) { + if (e == null) return false; + final String msg = e.getMessage(); + if (msg == null) return false; + final String lm = msg.toLowerCase(); + // code 550 is commonly used for 'does not exist' in this project; also match textual phrases + return msg.startsWith("550") || lm.contains("not exist"); + } - statement.execute("CREATE TABLE IF NOT EXISTS tboth (s1 int32)"); - statement.execute("INSERT INTO tboth (time, s1) VALUES (1, 1)"); + @Test(timeout = 120000) + @SuppressWarnings("resource") + public void testConcurrentRenameVsQueries() throws Throwable { + try (final Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + final Statement stmt = connection.createStatement()) { + final String db = "concrenamedb"; + final int tableCount = 6; + final int rows = 50; + stmt.execute("DROP DATABASE IF EXISTS " + db); + stmt.execute("CREATE DATABASE IF NOT EXISTS " + db); + stmt.execute("USE " + db); + + final String[] names = new String[tableCount]; + for (int i = 0; i < tableCount; i++) { + names[i] = "crtable" + i; + stmt.execute(String.format("CREATE TABLE IF NOT EXISTS %s (v int32)", names[i])); + for (int r = 1; r <= rows; r++) { + stmt.execute(String.format("INSERT INTO %s (time, v) VALUES (%d, %d)", names[i], r, r)); + } + } - // rename table first - statement.execute("ALTER TABLE tboth RENAME TO tboth_new"); + final java.util.concurrent.atomic.AtomicReference<Throwable> err = new java.util.concurrent.atomic.AtomicReference<>(); + final java.util.concurrent.CountDownLatch startLatch = new java.util.concurrent.CountDownLatch(1); + final java.util.concurrent.CountDownLatch doneLatch = new java.util.concurrent.CountDownLatch(4); + + java.util.concurrent.ExecutorService exec = null; + try { + exec = java.util.concurrent.Executors.newFixedThreadPool(8); + + // Renamer task: rotate rename a subset of tables repeatedly + exec.submit(() -> { + try (final Connection c = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + final Statement s = c.createStatement()) { + startLatch.await(); + // ensure this thread's connection uses the test database + try { + s.execute("USE " + db); + } catch (final SQLException ignore) { + } + for (int round = 0; round < 20 && err.get() == null; round++) { + for (int i = 0; i < tableCount / 2; i++) { + final String oldName = names[i]; + final String newName = oldName + "_r" + round; + try { + s.execute(String.format("ALTER TABLE %s RENAME TO %s", oldName, newName)); + // reflect change locally so queries target updated names + names[i] = newName; + } catch (final SQLException ex) { + // Only ignore if the failure is due to table not existing; otherwise record the error + if (isTableNotFound(ex)) { + // table not found: likely a transient race with concurrent rename — ignore and log + System.out.println("Ignored table-not-found during rename: " + ex.getMessage()); + } else { + err.compareAndSet(null, ex); + } + } + } + try { + Thread.sleep(50); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + } catch (final Throwable t) { + err.compareAndSet(null, t); + } finally { + doneLatch.countDown(); + } + }); + + // Queryer tasks: continuously query random tables + for (int q = 0; q < 2; q++) { + exec.submit(() -> { + try (final Connection c = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + final Statement s = c.createStatement()) { + final java.util.Random rnd = new java.util.Random(); + startLatch.await(); + // ensure this thread's connection uses the test database + try { + s.execute("USE " + db); + } catch (final SQLException ignore) { + } + for (int iter = 0; iter < 200 && err.get() == null; iter++) { + final int idx = rnd.nextInt(tableCount); + final String tname = names[idx]; + try (final ResultSet rs = s.executeQuery("SELECT count(*) FROM " + tname)) { + if (rs.next()) { + rs.getLong(1); + } + } catch (final SQLException ex) { + // Only ignore table-not-found; otherwise surface the error to fail the test + if (!isTableNotFound(ex)) { + err.compareAndSet(null, ex); + break; + } + } + } + } catch (final Throwable t) { + err.compareAndSet(null, t); + } finally { + doneLatch.countDown(); + } + }); + } - // then rename column on the new table - statement.execute("ALTER TABLE tboth_new RENAME COLUMN s1 TO s_new"); + // Another queryer to trigger more parallel access + exec.submit(() -> { + try (final Connection c = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + final Statement s = c.createStatement()) { + startLatch.await(); + // ensure this thread's connection uses the test database + try { + s.execute("USE " + db); + } catch (final SQLException ignore) { + } + for (int iter = 0; iter < 200 && err.get() == null; iter++) { + for (int i = 0; i < tableCount; i++) { + try (final ResultSet rs = s.executeQuery("SELECT * FROM " + names[i] + " LIMIT 1")) { + // consume + while (rs.next()) { + rs.getLong(1); + } + } catch (final SQLException ex) { + if (!isTableNotFound(ex)) { + err.compareAndSet(null, ex); + break; + } + } + } + } + } catch (final Throwable t) { + err.compareAndSet(null, t); + } finally { + doneLatch.countDown(); + } + }); + + // start + startLatch.countDown(); + // wait for tasks + doneLatch.await(); + + if (err.get() != null) { + throw err.get(); + } + } finally { + if (exec != null) { + exec.shutdownNow(); + } + } + } + } - // old table name should not exist anymore - try { - statement.execute("INSERT INTO tboth (time, s1) VALUES (2, 2)"); - fail(); - } catch (final SQLException e) { - assertTrue(e.getMessage().startsWith("550") || e.getMessage().toLowerCase().contains("does not exist")); + @Test + public void testMultiTableCrossCheckAfterRenames() throws Exception { + try (final Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + final Statement stmt = connection.createStatement()) { + final String db = "multicheckdb"; + stmt.execute("DROP DATABASE IF EXISTS " + db); + stmt.execute("CREATE DATABASE IF NOT EXISTS " + db); + stmt.execute("USE " + db); + + // create two related tables + stmt.execute("CREATE TABLE IF NOT EXISTS mta (k int32)"); + stmt.execute("CREATE TABLE IF NOT EXISTS mtb (k int32)"); + + for (int i = 1; i <= 10; i++) { + stmt.execute(String.format("INSERT INTO mta (time, k) VALUES (%d, %d)", i, i)); + stmt.execute(String.format("INSERT INTO mtb (time, k) VALUES (%d, %d)", i, i)); + } + + // baseline: read aggregates + long aCount = 0, bCount = 0; + try (final ResultSet ra = stmt.executeQuery("SELECT count(*) FROM mta")) { + if (ra.next()) { + aCount = ra.getLong(1); + } + } + try (final ResultSet rb = stmt.executeQuery("SELECT count(*) FROM mtb")) { + if (rb.next()) { + bCount = rb.getLong(1); + } } - // insert into the renamed table using the renamed column - statement.execute("INSERT INTO tboth_new (time, s_new) VALUES (2, 2)"); + // rename one table and verify cross results remain consistent when queried separately + stmt.execute("ALTER TABLE mtb RENAME TO mtb_renamed"); - ResultSet rs = statement.executeQuery("SELECT * FROM tboth_new"); - // first row from original name should be present and second row inserted after renames - assertTrue(rs.next()); - assertEquals(1, rs.getLong(1)); - assertEquals(1, rs.getInt(2)); - assertTrue(rs.next()); - assertEquals(2, rs.getLong(1)); - assertEquals(2, rs.getInt(2)); - assertFalse(rs.next()); + long bCountAfter = 0; + try (final ResultSet rb2 = stmt.executeQuery("SELECT count(*) FROM mtb_renamed")) { + if (rb2.next()) { + bCountAfter = rb2.getLong(1); + } + } + + // assert counts unchanged + assertEquals(bCount, bCountAfter); + assertEquals(10, aCount); + + // rename the other table and verify again + stmt.execute("ALTER TABLE mta RENAME TO mta_renamed"); + long aCountAfter = 0; + try (final ResultSet ra2 = stmt.executeQuery("SELECT count(*) FROM mta_renamed")) { + if (ra2.next()) { + aCountAfter = ra2.getLong(1); + } + } + assertEquals(aCount, aCountAfter); } } @Test - public void testAlterTableAndColumn_RenameColumnThenTable() throws Exception { - try (final Connection connection = - EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); - final Statement statement = connection.createStatement()) { - statement.execute("DROP DATABASE IF EXISTS testdb"); - statement.execute("CREATE DATABASE IF NOT EXISTS testdb"); - statement.execute("USE testdb"); + public void testPerformanceWithQuotedSpecialNameRenames() throws Exception { + try (final Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + final Statement stmt = connection.createStatement(); + final ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + final String db = "perfquotedb"; + final int tables = 3200; + final int rows = 100; + final int numFile = 5; + final int colPerTable = 100; + stmt.execute("DROP DATABASE IF EXISTS " + db); + stmt.execute("CREATE DATABASE IF NOT EXISTS " + db); + stmt.execute("USE " + db); + session.executeNonQueryStatement("USE " + db); + + final String[] names = new String[tables]; + StringBuilder createTableTemplate = new StringBuilder("CREATE TABLE IF NOT EXISTS %s ("); + for (int c = 0; c < colPerTable; c++) { + createTableTemplate.append(String.format("v%d int32,", c)); + } + createTableTemplate = new StringBuilder( + createTableTemplate.substring(0, createTableTemplate.length() - 1) + ")"); + List<ColumnSchema> columns = new ArrayList<>(); + for (int i = 0; i < colPerTable; i++) { + columns.add(new ColumnSchema("v" + i, TSDataType.INT32, ColumnCategory.FIELD)); + } + TableSchema tableSchema = new TableSchema( + "", // place holder + columns + ); + + System.out.println("Start data preparation..."); + for (int i = 0; i < tables; i++) { + names[i] = "qtable" + i; + stmt.execute(String.format(createTableTemplate.toString(), names[i])); + tableSchema.setTableName(names[i]); + Tablet tablet = new Tablet(tableSchema.getTableName(), tableSchema.getColumnSchemas().stream().map(IMeasurementSchema::getMeasurementName).collect( + Collectors.toList()), tableSchema.getColumnSchemas().stream().map(IMeasurementSchema::getType).collect( + Collectors.toList()), tableSchema.getColumnTypes(), rows); + for (int j = 0; j < numFile; j++) { + tablet.reset(); + for (int r = 1; r <= rows; r++) { + tablet.addTimestamp(r - 1, r + j * rows); + for (int c = 0; c < colPerTable; c++) { + tablet.addValue(r - 1, c, r + j * rows); + } + } + session.insert(tablet); + stmt.execute("FLUSH"); + } + } + System.out.println("Data preparation done."); + + // baseline measurement: simple average over a few runs + final int runs = 100; + double totalMs = 0.0; + for (int run = 0; run < runs; run++) { + final long start = System.nanoTime(); + for (int i = 0; i < tables; i++) { + try (final ResultSet rs = stmt.executeQuery("SELECT count(*) FROM " + names[i])) { + if (rs.next()) { + rs.getLong(1); + } + } + } + final long end = System.nanoTime(); + if (run > runs * 0.1) { + totalMs += (end - start) / 1_000_000.0; + } + } + final double baseline = totalMs / (runs * 0.9); + System.out.println("baseline_total_ms=" + String.format("%.3f", baseline)); + + // rename half of them to quoted special names and measure again + for (int i = 0; i < tables / 2; i++) { + final String oldName = names[i]; + final String newName = "\"" + oldName + "-特\""; // quoted name + stmt.execute(String.format("ALTER TABLE %s RENAME TO %s", oldName, newName)); + names[i] = newName; + } + + totalMs = 0.0; + for (int run = 0; run < runs; run++) { + final long start = System.nanoTime(); + for (int i = 0; i < tables; i++) { + try (final ResultSet rs = stmt.executeQuery("SELECT count(*) FROM " + names[i])) { + if (rs.next()) { + rs.getLong(1); + } + } + } + final long end = System.nanoTime(); + if (run > runs * 0.1) { + totalMs += (end - start) / 1_000_000.0; + } + } + final double after = totalMs / (runs * 0.9); + System.out.println("after_quoted_total_ms=" + String.format("%.3f", after)); + + // basic sanity: ensure queries still return counts + for (int i = 0; i < tables; i++) { + try (final ResultSet rs = stmt.executeQuery("SELECT count(*) FROM " + names[i])) { + assertTrue(rs.next()); + assertEquals(rows * numFile, rs.getLong(1)); + } + } + } + } - statement.execute("CREATE TABLE IF NOT EXISTS tboth2 (c1 int32)"); - statement.execute("INSERT INTO tboth2 (time, c1) VALUES (1, 1)"); + @Test + public void testAlterTableAndColumnTogether() throws Exception { + try (final Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + final Statement stmt = connection.createStatement()) { + final String db = "dualalterdb"; + stmt.execute("DROP DATABASE IF EXISTS " + db); + stmt.execute("CREATE DATABASE IF NOT EXISTS " + db); + stmt.execute("USE " + db); + + stmt.execute("CREATE TABLE IF NOT EXISTS tab1 (c1 int32, c2 int32)"); + stmt.execute("INSERT INTO tab1 (time, c1, c2) VALUES (1, 1, 10)"); + + // rename column first and then rename table + stmt.execute("ALTER TABLE tab1 RENAME COLUMN c1 TO c1_new"); + stmt.execute("ALTER TABLE tab1 RENAME TO tab1_new"); + + // old table name should not exist + try { + stmt.execute("INSERT INTO tab1 (time, c1_new) VALUES (2, 2)"); + fail(); + } catch (final SQLException e) { + assertTrue(e.getMessage().startsWith("550") || e.getMessage().toLowerCase().contains("does not exist")); + } - // rename column first - statement.execute("ALTER TABLE tboth2 RENAME COLUMN c1 TO c2"); + // inserting using new table and new column names should succeed + stmt.execute("INSERT INTO tab1_new (time, c1_new, c2) VALUES (2, 2, 20)"); - // after column rename, old column should not be auto-creatable - try { - statement.execute("INSERT INTO tboth2 (time, c1) VALUES (2, 2)"); - fail(); - } catch (final SQLException e) { - assertTrue(e.getMessage().startsWith("616") || e.getMessage().toLowerCase().contains("unknown") || e.getMessage().toLowerCase().contains("cannot")); - } + // verify data + try (final ResultSet rs = stmt.executeQuery("SELECT * FROM tab1_new ORDER BY time")) { + assertTrue(rs.next()); + assertEquals(1, rs.getLong(1)); + assertEquals(1, rs.getInt("c1_new")); + assertEquals(10, rs.getInt("c2")); - // then rename the table - statement.execute("ALTER TABLE tboth2 RENAME TO tboth2_new"); + assertTrue(rs.next()); + assertEquals(2, rs.getLong(1)); + assertEquals(2, rs.getInt("c1_new")); + assertEquals(20, rs.getInt("c2")); - // old table name should not exist - try { - statement.execute("INSERT INTO tboth2 (time, c2) VALUES (3, 3)"); - fail(); - } catch (final SQLException e) { - assertTrue(e.getMessage().startsWith("550") || e.getMessage().toLowerCase().contains("does not exist")); - } + assertFalse(rs.next()); + } - // insert into the new table using the renamed column - statement.execute("INSERT INTO tboth2_new (time, c2) VALUES (2, 2)"); - statement.execute("INSERT INTO tboth2_new (time, c2) VALUES (3, 3)"); + // rename column again on the renamed table and verify + stmt.execute("ALTER TABLE tab1_new RENAME COLUMN c1_new TO c1_final"); + try { + // old column identifier should fail + stmt.execute("INSERT INTO tab1_new (time, c1_new) VALUES (3, 3)"); + fail(); + } catch (final SQLException e) { + assertTrue(e.getMessage().startsWith("616") || e.getMessage().toLowerCase().contains("unknown") || e.getMessage().toLowerCase().contains("cannot be resolved")); + } - ResultSet rs = statement.executeQuery("SELECT * FROM tboth2_new ORDER BY time"); - for (int i = 1; i <= 3; i++) { - assertTrue(rs.next()); - assertEquals(i, rs.getLong(1)); - assertEquals(i, rs.getInt(2)); + // use final name + stmt.execute("INSERT INTO tab1_new (time, c1_final, c2) VALUES (3, 3, 30)"); + try (final ResultSet rs = stmt.executeQuery("SELECT count(*) FROM tab1_new")) { + if (rs.next()) { + assertEquals(3L, rs.getLong(1)); + } else { + fail(); + } + } } - assertFalse(rs.next()); - } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java index 9beab9bcfff..ac078e17a96 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java @@ -216,7 +216,13 @@ public class DataNodeTableCache implements ITableCache { database = PathUtils.unQualifyDatabaseName(database); readWriteLock.writeLock().lock(); try { - final TsTable newTable = preUpdateTableMap.get(database).get(tableName).getLeft(); + final TsTable newTable = preUpdateTableMap.getOrDefault(database, Collections.emptyMap()).getOrDefault(tableName, new Pair<>( + null, 0L)).getLeft(); + if (newTable == null) { + // someone invalidated the table before commit + // let the latter operation fetch it + return; + } // Cannot be committed, consider: // 1. Fetched a non-changed CN table // 2. CN is changed diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index be0f7c3e5ca..7bbf1964d90 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1264,6 +1264,8 @@ public class DataRegion implements IDataRegionForQuery { return; } + DataNodeTableCache.getInstance().invalid(databaseName); + syncCloseAllWorkingTsFileProcessors(); // may update table names in deviceIds
