This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch force_ci/support_schema_evolution
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ff692abfb86eabf27f8023c6e39fbc8427422111
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Jan 21 09:31:15 2026 +0800

    improve efficiency
---
 .../iotdb/relational/it/schema/IoTDBTableIT.java   | 16 +++---
 .../db/storageengine/dataregion/DataRegion.java    | 57 +++++++++++++++++++---
 .../dataregion/read/QueryDataSource.java           |  4 +-
 .../dataregion/tsfile/TsFileManager.java           | 27 ++++++----
 .../dataregion/tsfile/TsFileResource.java          | 20 ++++++--
 .../dataregion/tsfile/fileset/TsFileSet.java       | 24 +++++++++
 6 files changed, 117 insertions(+), 31 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java
index cf1bc59c776..eea64c75022 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java
@@ -1748,14 +1748,14 @@ public class IoTDBTableIT {
         final ITableSession session = 
EnvFactory.getEnv().getTableSessionConnection()) {
       final String db = "perfquotedb";
       final int colPerTable = 100;
-      final int tables = 3200;
+      final int tables = 1600;
       final int rows = 100;
       final int numFile = 5;
-      final int runs = 30;
+      final int runs = 10;
       stmt.execute("DROP DATABASE IF EXISTS " + db);
       stmt.execute("CREATE DATABASE IF NOT EXISTS " + db);
       stmt.execute("USE " + db);
-      // stmt.execute("set configuration enable_seq_space_compaction='false'");
+      stmt.execute("set configuration enable_seq_space_compaction='false'");
       session.executeNonQueryStatement("USE " + db);
 
       final String[] names = new String[tables];
@@ -1811,9 +1811,8 @@ public class IoTDBTableIT {
         final long start = System.nanoTime();
         for (int i = 0; i < tables; i++) {
           try (final ResultSet rs = stmt.executeQuery("SELECT count(*) FROM " 
+ names[i])) {
-            if (rs.next()) {
-              rs.getLong(1);
-            }
+            assertTrue(rs.next());
+            assertEquals(rows * numFile, rs.getLong(1));
           }
         }
         final long end = System.nanoTime();
@@ -1837,9 +1836,8 @@ public class IoTDBTableIT {
         final long start = System.nanoTime();
         for (int i = 0; i < tables; i++) {
           try (final ResultSet rs = stmt.executeQuery("SELECT count(*) FROM " 
+ names[i])) {
-            if (rs.next()) {
-              rs.getLong(1);
-            }
+            assertTrue(rs.next());
+            assertEquals(rows * numFile, rs.getLong(1));
           }
         }
         final long end = System.nanoTime();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 4124626c64a..cfe711d7a21 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2607,7 +2607,8 @@ public class DataRegion implements IDataRegionForQuery {
     for (TsFileResource tsFileResource : seqResources) {
       // only need to acquire flush lock for those unclosed and satisfied 
tsfile
       if (!tsFileResource.isClosed()
-          && tsFileResource.isSatisfied(singleDeviceId, globalTimeFilter, 
true, isDebug)) {
+          && tsFileResource.isFinalDeviceIdSatisfied(
+              singleDeviceId, globalTimeFilter, true, isDebug)) {
         TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
         try {
           if (tsFileProcessor == null) {
@@ -2654,7 +2655,8 @@ public class DataRegion implements IDataRegionForQuery {
     // deal with unSeq resources
     for (TsFileResource tsFileResource : unSeqResources) {
       if (!tsFileResource.isClosed()
-          && tsFileResource.isSatisfied(singleDeviceId, globalTimeFilter, 
false, isDebug)) {
+          && tsFileResource.isFinalDeviceIdSatisfied(
+              singleDeviceId, globalTimeFilter, false, isDebug)) {
         TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
         try {
           if (tsFileProcessor == null) {
@@ -2769,7 +2771,8 @@ public class DataRegion implements IDataRegionForQuery {
     List<IFileScanHandle> fileScanHandles = new ArrayList<>();
 
     for (TsFileResource tsFileResource : tsFileResources) {
-      if (!tsFileResource.isSatisfied(null, globalTimeFilter, isSeq, 
context.isDebug())) {
+      if (!tsFileResource.isFinalDeviceIdSatisfied(
+          null, globalTimeFilter, isSeq, context.isDebug())) {
         continue;
       }
       if (tsFileResource.isClosed()) {
@@ -2847,7 +2850,8 @@ public class DataRegion implements IDataRegionForQuery {
     List<IFileScanHandle> fileScanHandles = new ArrayList<>();
 
     for (TsFileResource tsFileResource : tsFileResources) {
-      if (!tsFileResource.isSatisfied(null, globalTimeFilter, isSeq, 
context.isDebug())) {
+      if (!tsFileResource.isFinalDeviceIdSatisfied(
+          null, globalTimeFilter, isSeq, context.isDebug())) {
         continue;
       }
       if (tsFileResource.isClosed()) {
@@ -2930,7 +2934,7 @@ public class DataRegion implements IDataRegionForQuery {
    */
   @SuppressWarnings("SuspiciousSystemArraycopy")
   private List<TsFileResource> getFileResourceListForQuery(
-      Collection<TsFileResource> tsFileResources,
+      List<TsFileResource> tsFileResources,
       List<IFullPath> pathList,
       IDeviceID singleDeviceId,
       QueryContext context,
@@ -2940,8 +2944,49 @@ public class DataRegion implements IDataRegionForQuery {
 
     List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
 
+    List<TsFileSet> tsFileSets = Collections.emptyList();
+    int tsFileSetsIndex = 0;
+    Long currentTimePartitionId = null;
+    EvolvedSchema currentEvolvedSchema;
+    IDeviceID originalDeviceId = singleDeviceId;
+
     for (TsFileResource tsFileResource : tsFileResources) {
-      if (!tsFileResource.isSatisfied(singleDeviceId, globalTimeFilter, isSeq, 
context.isDebug())) {
+      long fileTimePartition = tsFileResource.getTimePartition();
+      // update TsFileSets if time partition changes
+      boolean tsFileSetsChanged = false;
+      if (currentTimePartitionId == null || currentTimePartitionId != 
fileTimePartition) {
+        currentTimePartitionId = fileTimePartition;
+        tsFileSets = tsFileManager.getTsFileSet(fileTimePartition);
+        tsFileSetsIndex = 0;
+        tsFileSetsChanged = true;
+        originalDeviceId = singleDeviceId;
+      }
+      // find TsFileSets this file belongs to
+      while (tsFileSetsIndex < tsFileSets.size()) {
+        TsFileSet tsFileSet = tsFileSets.get(tsFileSetsIndex);
+        if (tsFileSet.contains(tsFileResource)) {
+          break;
+        } else {
+          tsFileSetsChanged = true;
+          tsFileSetsIndex++;
+        }
+      }
+      // if TsFileSets change, update EvolvedSchema
+      if (tsFileSetsChanged && tsFileSetsIndex < tsFileSets.size()) {
+        currentEvolvedSchema =
+            TsFileSet.getMergedEvolvedSchema(
+                tsFileSets.subList(tsFileSetsIndex, tsFileSets.size()));
+        // use EvolvedSchema to rewrite deviceId to original deviceId
+        if (currentEvolvedSchema != null) {
+          originalDeviceId = 
currentEvolvedSchema.rewriteToOriginal(singleDeviceId);
+        } else {
+          originalDeviceId = singleDeviceId;
+        }
+      }
+
+      // reuse the deviceId to avoid rewriting again or reading EvolvedSchema 
unnecessarily
+      if (!tsFileResource.isOriginalDeviceIdSatisfied(
+          originalDeviceId, globalTimeFilter, isSeq, context.isDebug())) {
         continue;
       }
       try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
index 9bdbe1c4932..8c95847788e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
@@ -142,7 +142,7 @@ public class QueryDataSource implements IQueryDataSource {
       curSeqSatisfied =
           tsFileResource != null
               && (isSingleDevice
-                  || tsFileResource.isSatisfied(
+                  || tsFileResource.isFinalDeviceIdSatisfied(
                       deviceID, timeFilter, true, debug, 
maxTsFileSetEndVersion));
     }
 
@@ -194,7 +194,7 @@ public class QueryDataSource implements IQueryDataSource {
       curUnSeqSatisfied =
           tsFileResource != null
               && (isSingleDevice
-                  || tsFileResource.isSatisfied(
+                  || tsFileResource.isFinalDeviceIdSatisfied(
                       deviceID, timeFilter, false, debug, 
maxTsFileSetEndVersion));
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
index fe45bd7f1ec..1e4f31d1894 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.tsfile;
 
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModFileManagement;
@@ -41,11 +39,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
 
 public class TsFileManager {
   private final String storageGroupName;
@@ -524,14 +523,24 @@ public class TsFileManager {
     tsFileSetList.add(newSet);
   }
 
+  public List<TsFileSet> getTsFileSet(long partitionId) {
+    return getTsFileSet(partitionId, Long.MIN_VALUE, Long.MAX_VALUE);
+  }
+
   public List<TsFileSet> getTsFileSet(
       long partitionId, long minFileVersionIncluded, long 
maxFileVersionExcluded) {
     List<TsFileSet> tsFileSetList = tsfileSets.getOrDefault(partitionId, 
Collections.emptyList());
-    return tsFileSetList.stream()
-        .filter(
-            s ->
-                s.getEndVersion() < maxFileVersionExcluded
-                    && s.getEndVersion() >= minFileVersionIncluded)
-        .collect(Collectors.toList());
+    int start = 0, end = tsFileSetList.size();
+    for (int i = 0, tsFileSetListSize = tsFileSetList.size(); i < 
tsFileSetListSize; i++) {
+      TsFileSet tsFileSet = tsFileSetList.get(i);
+      if (tsFileSet.getEndVersion() < minFileVersionIncluded) {
+        start = i + 1;
+      }
+      if (tsFileSet.getEndVersion() >= maxFileVersionExcluded) {
+        end = i;
+        break;
+      }
+    }
+    return start < end ? tsFileSetList.subList(start, end) : 
Collections.emptyList();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index c8d4239714f..ab2ae700bc9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -1053,16 +1053,16 @@ public class TsFileResource implements 
PersistentResource, Cloneable {
   /**
    * @param deviceId IDeviceId after schema evolution
    */
-  public boolean isSatisfied(IDeviceID deviceId, Filter timeFilter, boolean 
isSeq, boolean debug) {
-    return isSatisfied(deviceId, timeFilter, isSeq, debug, Long.MAX_VALUE);
+  public boolean isFinalDeviceIdSatisfied(
+      IDeviceID deviceId, Filter timeFilter, boolean isSeq, boolean debug) {
+    return isFinalDeviceIdSatisfied(deviceId, timeFilter, isSeq, debug, 
Long.MAX_VALUE);
   }
 
   /**
    * @param deviceId the IDeviceID after schema evolution
    * @return true if the device is contained in the TsFile
    */
-  @SuppressWarnings("OptionalGetWithoutIsPresent")
-  public boolean isSatisfied(
+  public boolean isFinalDeviceIdSatisfied(
       IDeviceID deviceId,
       Filter timeFilter,
       boolean isSeq,
@@ -1072,6 +1072,16 @@ public class TsFileResource implements 
PersistentResource, Cloneable {
     if (evolvedSchema != null) {
       deviceId = evolvedSchema.rewriteToOriginal(deviceId);
     }
+    return isOriginalDeviceIdSatisfied(deviceId, timeFilter, isSeq, debug);
+  }
+
+  /**
+   * @param deviceId the IDeviceID before schema evolution
+   * @return true if the device is contained in the TsFile
+   */
+  @SuppressWarnings("OptionalGetWithoutIsPresent")
+  public boolean isOriginalDeviceIdSatisfied(
+      IDeviceID deviceId, Filter timeFilter, boolean isSeq, boolean debug) {
     if (deviceId != null && definitelyNotContains(deviceId)) {
       if (debug) {
         DEBUG_LOGGER.info(
@@ -1718,7 +1728,7 @@ public class TsFileResource implements 
PersistentResource, Cloneable {
     List<TsFileSet> tsFileSets = getTsFileSets();
     for (TsFileSet fileSet : tsFileSets) {
       if (fileSet.getEndVersion() >= excludedMaxFileVersion) {
-        continue;
+        break;
       }
 
       try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
index 676f8c4a3fd..c3230c6297c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
@@ -19,21 +19,27 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset;
 
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchemaCache;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolutionFile;
 
 import org.apache.tsfile.external.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /** TsFileSet represents a set of TsFiles in a time partition whose version <= 
endVersion. */
 public class TsFileSet implements Comparable<TsFileSet> {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TsFileSet.class);
   public static final String FILE_SET_DIR_NAME = "filesets";
 
   private final long endVersion;
@@ -133,4 +139,22 @@ public class TsFileSet implements Comparable<TsFileSet> {
   public void remove() {
     FileUtils.deleteQuietly(fileSetDir);
   }
+
+  public boolean contains(TsFileResource tsFileResource) {
+    return tsFileResource.getVersion() <= endVersion;
+  }
+
+  public static EvolvedSchema getMergedEvolvedSchema(List<TsFileSet> 
tsFileSetList) {
+    List<EvolvedSchema> list = new ArrayList<>();
+    for (TsFileSet fileSet : tsFileSetList) {
+      try {
+        EvolvedSchema readEvolvedSchema = fileSet.readEvolvedSchema();
+        list.add(readEvolvedSchema);
+      } catch (IOException e) {
+        LOGGER.warn("Cannot read evolved schema from {}, skipping it", 
fileSet);
+      }
+    }
+
+    return EvolvedSchema.merge(list.toArray(new EvolvedSchema[0]));
+  }
 }

Reply via email to