This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch force_ci/support_schema_evolution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cdd0b878d542bd531163f79b072b1965b2927103 Author: Tian Jiang <[email protected]> AuthorDate: Tue Jan 20 09:58:33 2026 +0800 reduce lock contention --- .../iotdb/relational/it/schema/IoTDBTableIT.java | 8 ++--- .../db/schemaengine/table/DataNodeTableCache.java | 8 +---- .../db/storageengine/dataregion/DataRegion.java | 2 -- .../dataregion/tsfile/TsFileManager.java | 34 +++++++++------------- 4 files changed, 18 insertions(+), 34 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java index 25666f8181f..cf1bc59c776 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java @@ -1747,14 +1747,15 @@ public class IoTDBTableIT { final Statement stmt = connection.createStatement(); final ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { final String db = "perfquotedb"; - final int colPerTable = 1; - final int tables = 8; + final int colPerTable = 100; + final int tables = 3200; final int rows = 100; final int numFile = 5; + final int runs = 30; stmt.execute("DROP DATABASE IF EXISTS " + db); stmt.execute("CREATE DATABASE IF NOT EXISTS " + db); stmt.execute("USE " + db); - //stmt.execute("set configuration enable_seq_space_compaction='false'"); + // stmt.execute("set configuration enable_seq_space_compaction='false'"); session.executeNonQueryStatement("USE " + db); final String[] names = new String[tables]; @@ -1805,7 +1806,6 @@ public class IoTDBTableIT { System.out.println("Data preparation done."); // baseline measurement: simple average over a few runs - final int runs = 100; double totalMs = 0.0; for (int run = 0; run < runs; run++) { final long start = System.nanoTime(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java index ac078e17a96..9beab9bcfff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java @@ -216,13 +216,7 @@ public class DataNodeTableCache implements ITableCache { database = PathUtils.unQualifyDatabaseName(database); readWriteLock.writeLock().lock(); try { - final TsTable newTable = preUpdateTableMap.getOrDefault(database, Collections.emptyMap()).getOrDefault(tableName, new Pair<>( - null, 0L)).getLeft(); - if (newTable == null) { - // someone invalidated the table before commit - // let the latter operation fetch it - return; - } + final TsTable newTable = preUpdateTableMap.get(database).get(tableName).getLeft(); // Cannot be committed, consider: // 1. Fetched a non-changed CN table // 2. CN is changed diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index ebcfd0c040a..4124626c64a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1264,8 +1264,6 @@ public class DataRegion implements IDataRegionForQuery { return; } - DataNodeTableCache.getInstance().invalid(databaseName); - syncCloseAllWorkingTsFileProcessors(); // may update table names in deviceIds diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java index 8f74c561a88..fe45bd7f1ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.storageengine.dataregion.modification.ModFileManagement; @@ -58,7 +60,7 @@ public class TsFileManager { private final TreeMap<Long, TsFileResourceList> sequenceFiles = new TreeMap<>(); private final TreeMap<Long, TsFileResourceList> unsequenceFiles = new TreeMap<>(); private final TreeMap<Long, ModFileManagement> modFileManagementMap = new TreeMap<>(); - private final TreeMap<Long, List<TsFileSet>> tsfileSets = new TreeMap<>(); + private final Map<Long, List<TsFileSet>> tsfileSets = new ConcurrentSkipListMap<>(); private volatile boolean allowCompaction = true; private final AtomicLong currentCompactionTaskSerialId = new AtomicLong(0); @@ -517,29 +519,19 @@ public class TsFileManager { } public void addTsFileSet(TsFileSet newSet, long partitionId) { - writeLock("addTsFileSet"); - try { - List<TsFileSet> tsFileSetList = - tsfileSets.computeIfAbsent(partitionId, p -> new ArrayList<>()); - tsFileSetList.add(newSet); - } finally { - writeUnlock(); - } + List<TsFileSet> tsFileSetList = + tsfileSets.computeIfAbsent(partitionId, p -> new CopyOnWriteArrayList<>()); + tsFileSetList.add(newSet); } public List<TsFileSet> getTsFileSet( long partitionId, long minFileVersionIncluded, long maxFileVersionExcluded) { - readLock(); - try { - List<TsFileSet> tsFileSetList = tsfileSets.getOrDefault(partitionId, Collections.emptyList()); - return tsFileSetList.stream() - .filter( - s -> - s.getEndVersion() < maxFileVersionExcluded - && s.getEndVersion() >= minFileVersionIncluded) - .collect(Collectors.toList()); - } finally { - readUnlock(); - } + List<TsFileSet> tsFileSetList = tsfileSets.getOrDefault(partitionId, Collections.emptyList()); + return tsFileSetList.stream() + .filter( + s -> + s.getEndVersion() < maxFileVersionExcluded + && s.getEndVersion() >= minFileVersionIncluded) + .collect(Collectors.toList()); } }
