This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch force_ci/support_schema_evolution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ff692abfb86eabf27f8023c6e39fbc8427422111 Author: Tian Jiang <[email protected]> AuthorDate: Wed Jan 21 09:31:15 2026 +0800 improve efficiency --- .../iotdb/relational/it/schema/IoTDBTableIT.java | 16 +++--- .../db/storageengine/dataregion/DataRegion.java | 57 +++++++++++++++++++--- .../dataregion/read/QueryDataSource.java | 4 +- .../dataregion/tsfile/TsFileManager.java | 27 ++++++---- .../dataregion/tsfile/TsFileResource.java | 20 ++++++-- .../dataregion/tsfile/fileset/TsFileSet.java | 24 +++++++++ 6 files changed, 117 insertions(+), 31 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java index cf1bc59c776..eea64c75022 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java @@ -1748,14 +1748,14 @@ public class IoTDBTableIT { final ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { final String db = "perfquotedb"; final int colPerTable = 100; - final int tables = 3200; + final int tables = 1600; final int rows = 100; final int numFile = 5; - final int runs = 30; + final int runs = 10; stmt.execute("DROP DATABASE IF EXISTS " + db); stmt.execute("CREATE DATABASE IF NOT EXISTS " + db); stmt.execute("USE " + db); - // stmt.execute("set configuration enable_seq_space_compaction='false'"); + stmt.execute("set configuration enable_seq_space_compaction='false'"); session.executeNonQueryStatement("USE " + db); final String[] names = new String[tables]; @@ -1811,9 +1811,8 @@ public class IoTDBTableIT { final long start = System.nanoTime(); for (int i = 0; i < tables; i++) { try (final ResultSet rs = stmt.executeQuery("SELECT count(*) FROM " + names[i])) { - if (rs.next()) { - rs.getLong(1); - } + assertTrue(rs.next()); + assertEquals(rows * numFile, rs.getLong(1)); } } final long end = System.nanoTime(); @@ -1837,9 +1836,8 @@ public class IoTDBTableIT { final long start = System.nanoTime(); for (int i = 0; i < tables; i++) { try (final ResultSet rs = stmt.executeQuery("SELECT count(*) FROM " + names[i])) { - if (rs.next()) { - rs.getLong(1); - } + assertTrue(rs.next()); + assertEquals(rows * numFile, rs.getLong(1)); } } final long end = System.nanoTime(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 4124626c64a..cfe711d7a21 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -2607,7 +2607,8 @@ public class DataRegion implements IDataRegionForQuery { for (TsFileResource tsFileResource : seqResources) { // only need to acquire flush lock for those unclosed and satisfied tsfile if (!tsFileResource.isClosed() - && tsFileResource.isSatisfied(singleDeviceId, globalTimeFilter, true, isDebug)) { + && tsFileResource.isFinalDeviceIdSatisfied( + singleDeviceId, globalTimeFilter, true, isDebug)) { TsFileProcessor tsFileProcessor = tsFileResource.getProcessor(); try { if (tsFileProcessor == null) { @@ -2654,7 +2655,8 @@ public class DataRegion implements IDataRegionForQuery { // deal with unSeq resources for (TsFileResource tsFileResource : unSeqResources) { if (!tsFileResource.isClosed() - && tsFileResource.isSatisfied(singleDeviceId, globalTimeFilter, false, isDebug)) { + && tsFileResource.isFinalDeviceIdSatisfied( + singleDeviceId, globalTimeFilter, false, isDebug)) { TsFileProcessor tsFileProcessor = tsFileResource.getProcessor(); try { if (tsFileProcessor == null) { @@ -2769,7 +2771,8 @@ public class DataRegion implements IDataRegionForQuery { List<IFileScanHandle> fileScanHandles = new ArrayList<>(); for (TsFileResource tsFileResource : tsFileResources) { - if (!tsFileResource.isSatisfied(null, globalTimeFilter, isSeq, context.isDebug())) { + if (!tsFileResource.isFinalDeviceIdSatisfied( + null, globalTimeFilter, isSeq, context.isDebug())) { continue; } if (tsFileResource.isClosed()) { @@ -2847,7 +2850,8 @@ public class DataRegion implements IDataRegionForQuery { List<IFileScanHandle> fileScanHandles = new ArrayList<>(); for (TsFileResource tsFileResource : tsFileResources) { - if (!tsFileResource.isSatisfied(null, globalTimeFilter, isSeq, context.isDebug())) { + if (!tsFileResource.isFinalDeviceIdSatisfied( + null, globalTimeFilter, isSeq, context.isDebug())) { continue; } if (tsFileResource.isClosed()) { @@ -2930,7 +2934,7 @@ public class DataRegion implements IDataRegionForQuery { */ @SuppressWarnings("SuspiciousSystemArraycopy") private List<TsFileResource> getFileResourceListForQuery( - Collection<TsFileResource> tsFileResources, + List<TsFileResource> tsFileResources, List<IFullPath> pathList, IDeviceID singleDeviceId, QueryContext context, @@ -2940,8 +2944,49 @@ public class DataRegion implements IDataRegionForQuery { List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); + List<TsFileSet> tsFileSets = Collections.emptyList(); + int tsFileSetsIndex = 0; + Long currentTimePartitionId = null; + EvolvedSchema currentEvolvedSchema; + IDeviceID originalDeviceId = singleDeviceId; + for (TsFileResource tsFileResource : tsFileResources) { - if (!tsFileResource.isSatisfied(singleDeviceId, globalTimeFilter, isSeq, context.isDebug())) { + long fileTimePartition = tsFileResource.getTimePartition(); + // update TsFileSets if time partition changes + boolean tsFileSetsChanged = false; + if (currentTimePartitionId == null || currentTimePartitionId != fileTimePartition) { + currentTimePartitionId = fileTimePartition; + tsFileSets = tsFileManager.getTsFileSet(fileTimePartition); + tsFileSetsIndex = 0; + tsFileSetsChanged = true; + originalDeviceId = singleDeviceId; + } + // find TsFileSets this file belongs to + while (tsFileSetsIndex < tsFileSets.size()) { + TsFileSet tsFileSet = tsFileSets.get(tsFileSetsIndex); + if (tsFileSet.contains(tsFileResource)) { + break; + } else { + tsFileSetsChanged = true; + tsFileSetsIndex++; + } + } + // if TsFileSets change, update EvolvedSchema + if (tsFileSetsChanged && tsFileSetsIndex < tsFileSets.size()) { + currentEvolvedSchema = + TsFileSet.getMergedEvolvedSchema( + tsFileSets.subList(tsFileSetsIndex, tsFileSets.size())); + // use EvolvedSchema to rewrite deviceId to original deviceId + if (currentEvolvedSchema != null) { + originalDeviceId = currentEvolvedSchema.rewriteToOriginal(singleDeviceId); + } else { + originalDeviceId = singleDeviceId; + } + } + + // reuse the deviceId to avoid rewriting again or reading EvolvedSchema unnecessarily + if (!tsFileResource.isOriginalDeviceIdSatisfied( + originalDeviceId, globalTimeFilter, isSeq, context.isDebug())) { continue; } try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java index 9bdbe1c4932..8c95847788e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java @@ -142,7 +142,7 @@ public class QueryDataSource implements IQueryDataSource { curSeqSatisfied = tsFileResource != null && (isSingleDevice - || tsFileResource.isSatisfied( + || tsFileResource.isFinalDeviceIdSatisfied( deviceID, timeFilter, true, debug, maxTsFileSetEndVersion)); } @@ -194,7 +194,7 @@ public class QueryDataSource implements IQueryDataSource { curUnSeqSatisfied = tsFileResource != null && (isSingleDevice - || tsFileResource.isSatisfied( + || tsFileResource.isFinalDeviceIdSatisfied( deviceID, timeFilter, false, debug, maxTsFileSetEndVersion)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java index fe45bd7f1ec..1e4f31d1894 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java @@ -19,8 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CopyOnWriteArrayList; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.storageengine.dataregion.modification.ModFileManagement; @@ -41,11 +39,12 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; public class TsFileManager { private final String storageGroupName; @@ -524,14 +523,24 @@ public class TsFileManager { tsFileSetList.add(newSet); } + public List<TsFileSet> getTsFileSet(long partitionId) { + return getTsFileSet(partitionId, Long.MIN_VALUE, Long.MAX_VALUE); + } + public List<TsFileSet> getTsFileSet( long partitionId, long minFileVersionIncluded, long maxFileVersionExcluded) { List<TsFileSet> tsFileSetList = tsfileSets.getOrDefault(partitionId, Collections.emptyList()); - return tsFileSetList.stream() - .filter( - s -> - s.getEndVersion() < maxFileVersionExcluded - && s.getEndVersion() >= minFileVersionIncluded) - .collect(Collectors.toList()); + int start = 0, end = tsFileSetList.size(); + for (int i = 0, tsFileSetListSize = tsFileSetList.size(); i < tsFileSetListSize; i++) { + TsFileSet tsFileSet = tsFileSetList.get(i); + if (tsFileSet.getEndVersion() < minFileVersionIncluded) { + start = i + 1; + } + if (tsFileSet.getEndVersion() >= maxFileVersionExcluded) { + end = i; + break; + } + } + return start < end ? tsFileSetList.subList(start, end) : Collections.emptyList(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index c8d4239714f..ab2ae700bc9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -1053,16 +1053,16 @@ public class TsFileResource implements PersistentResource, Cloneable { /** * @param deviceId IDeviceId after schema evolution */ - public boolean isSatisfied(IDeviceID deviceId, Filter timeFilter, boolean isSeq, boolean debug) { - return isSatisfied(deviceId, timeFilter, isSeq, debug, Long.MAX_VALUE); + public boolean isFinalDeviceIdSatisfied( + IDeviceID deviceId, Filter timeFilter, boolean isSeq, boolean debug) { + return isFinalDeviceIdSatisfied(deviceId, timeFilter, isSeq, debug, Long.MAX_VALUE); } /** * @param deviceId the IDeviceID after schema evolution * @return true if the device is contained in the TsFile */ - @SuppressWarnings("OptionalGetWithoutIsPresent") - public boolean isSatisfied( + public boolean isFinalDeviceIdSatisfied( IDeviceID deviceId, Filter timeFilter, boolean isSeq, @@ -1072,6 +1072,16 @@ public class TsFileResource implements PersistentResource, Cloneable { if (evolvedSchema != null) { deviceId = evolvedSchema.rewriteToOriginal(deviceId); } + return isOriginalDeviceIdSatisfied(deviceId, timeFilter, isSeq, debug); + } + + /** + * @param deviceId the IDeviceID before schema evolution + * @return true if the device is contained in the TsFile + */ + @SuppressWarnings("OptionalGetWithoutIsPresent") + public boolean isOriginalDeviceIdSatisfied( + IDeviceID deviceId, Filter timeFilter, boolean isSeq, boolean debug) { if (deviceId != null && definitelyNotContains(deviceId)) { if (debug) { DEBUG_LOGGER.info( @@ -1718,7 +1728,7 @@ public class TsFileResource implements PersistentResource, Cloneable { List<TsFileSet> tsFileSets = getTsFileSets(); for (TsFileSet fileSet : tsFileSets) { if (fileSet.getEndVersion() >= excludedMaxFileVersion) { - continue; + break; } try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java index 676f8c4a3fd..c3230c6297c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java @@ -19,21 +19,27 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchemaCache; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolutionFile; import org.apache.tsfile.external.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.concurrent.locks.ReentrantReadWriteLock; /** TsFileSet represents a set of TsFiles in a time partition whose version <= endVersion. */ public class TsFileSet implements Comparable<TsFileSet> { + private static final Logger LOGGER = LoggerFactory.getLogger(TsFileSet.class); public static final String FILE_SET_DIR_NAME = "filesets"; private final long endVersion; @@ -133,4 +139,22 @@ public class TsFileSet implements Comparable<TsFileSet> { public void remove() { FileUtils.deleteQuietly(fileSetDir); } + + public boolean contains(TsFileResource tsFileResource) { + return tsFileResource.getVersion() <= endVersion; + } + + public static EvolvedSchema getMergedEvolvedSchema(List<TsFileSet> tsFileSetList) { + List<EvolvedSchema> list = new ArrayList<>(); + for (TsFileSet fileSet : tsFileSetList) { + try { + EvolvedSchema readEvolvedSchema = fileSet.readEvolvedSchema(); + list.add(readEvolvedSchema); + } catch (IOException e) { + LOGGER.warn("Cannot read evolved schema from {}, skipping it", fileSet); + } + } + + return EvolvedSchema.merge(list.toArray(new EvolvedSchema[0])); + } }
