This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch read_tsfile_table_function
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/read_tsfile_table_function by 
this push:
     new 8587cbd3366 add tests
8587cbd3366 is described below

commit 8587cbd3366987271a61d5ff240b28b3fdc4fe8b
Author: shuwenwei <[email protected]>
AuthorDate: Tue Jun 16 12:04:34 2026 +0800

    add tests
---
 .../recent/IoTDBReadTsFileTableFunctionIT.java     |  42 +++++
 .../operator/source/AlignedSeriesScanUtil.java     |   3 +-
 .../execution/operator/source/FileLoaderUtils.java |   5 +-
 .../relational/ExternalTsFileSeriesScanUtil.java   |   4 +-
 .../readTsFile/ExternalTsFileQueryResource.java    |  53 ++++--
 .../tvf/readTsFile/TsFileSchemaCollector.java      |  24 ++-
 .../buffer/TimeSeriesMetadataCache.java            |   9 +-
 .../ExternalTsFileQueryResourceTest.java           | 193 +++++++++++++++++++++
 8 files changed, 298 insertions(+), 35 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBReadTsFileTableFunctionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBReadTsFileTableFunctionIT.java
index c033491e95b..870a37aedaf 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBReadTsFileTableFunctionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBReadTsFileTableFunctionIT.java
@@ -286,6 +286,48 @@ public class IoTDBReadTsFileTableFunctionIT {
         DATABASE_NAME);
   }
 
+  @Test
+  public void testReadMultipleTsFilesWithConflictingTagAndFieldColumns() 
throws Exception {
+    File tsFile1 = new File(tmpDir, "tag-field-conflict-1.tsfile");
+    try (TsFileWriter writer = new TsFileWriter(tsFile1)) {
+      generateTable(writer, "table1", Arrays.asList("shared"), 
Arrays.asList("s1"), 1, 2);
+    }
+    File tsFile2 = new File(tmpDir, "tag-field-conflict-2.tsfile");
+    try (TsFileWriter writer = new TsFileWriter(tsFile2)) {
+      generateTable(writer, "table1", new ArrayList<>(), 
Arrays.asList("shared"), 3, 4);
+    }
+
+    tableAssertTestFail(
+        "SELECT * FROM read_tsfile(PATHS => '"
+            + toSqlPath(tsFile1)
+            + ","
+            + toSqlPath(tsFile2)
+            + "', TABLE_NAME => 'table1')",
+        "conflicting categories when merging table schema",
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testReadMultipleTsFilesWithConflictingFieldAndTagColumns() 
throws Exception {
+    File tsFile1 = new File(tmpDir, "field-tag-conflict-1.tsfile");
+    try (TsFileWriter writer = new TsFileWriter(tsFile1)) {
+      generateTable(writer, "table1", new ArrayList<>(), 
Arrays.asList("shared"), 1, 2);
+    }
+    File tsFile2 = new File(tmpDir, "field-tag-conflict-2.tsfile");
+    try (TsFileWriter writer = new TsFileWriter(tsFile2)) {
+      generateTable(writer, "table1", Arrays.asList("shared"), 
Arrays.asList("s1"), 3, 4);
+    }
+
+    tableAssertTestFail(
+        "SELECT * FROM read_tsfile(PATHS => '"
+            + toSqlPath(tsFile1)
+            + ","
+            + toSqlPath(tsFile2)
+            + "', TABLE_NAME => 'table1')",
+        "conflicting categories when merging table schema",
+        DATABASE_NAME);
+  }
+
   @Test
   public void testReadTsFileWithoutTableNameWhenMultipleTablesExist() throws 
Exception {
     File tsFile = new File(tmpDir, "multiple-tables.tsfile");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java
index e80f305e73b..331a5a5b267 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java
@@ -38,7 +38,6 @@ import org.apache.tsfile.write.schema.IMeasurementSchema;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 public class AlignedSeriesScanUtil extends SeriesScanUtil {
@@ -102,7 +101,7 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
         scanOptions.getGlobalTimeFilter(),
         isSeq,
         ignoreAllNullRows,
-        Optional.empty());
+        null);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
index 79235360cdf..e5761050509 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
@@ -57,7 +57,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -200,7 +199,7 @@ public class FileLoaderUtils {
       Filter globalTimeFilter,
       boolean isSeq,
       boolean ignoreAllNullRows,
-      Optional<long[]> rootMeasurementMetadataIndexNodeOffset)
+      long[] rootMeasurementMetadataIndexNodeOffset)
       throws IOException {
     final long t1 = System.nanoTime();
     boolean loadFromMem = false;
@@ -293,7 +292,7 @@ public class FileLoaderUtils {
       FragmentInstanceContext context,
       Filter globalTimeFilter,
       boolean ignoreAllNullRows,
-      Optional<long[]> rootMeasurementMetadataIndexNodeOffset)
+      long[] rootMeasurementMetadataIndexNodeOffset)
       throws IOException {
     AbstractAlignedTimeSeriesMetadata alignedTimeSeriesMetadata = null;
     // load all the TimeseriesMetadata of vector, the first one is for time 
column and the
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
index c22384160ae..7726a510a14 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
@@ -36,7 +36,6 @@ import org.apache.tsfile.read.filter.basic.Filter;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Optional;
 
 public class ExternalTsFileSeriesScanUtil extends AlignedSeriesScanUtil {
 
@@ -84,8 +83,7 @@ public class ExternalTsFileSeriesScanUtil extends 
AlignedSeriesScanUtil {
         globalTimeFilter,
         resource.isSeq(),
         context.isIgnoreAllNullRows(),
-        Optional.of(
-            new long[] {currentDeviceOffset.getStartOffset(), 
currentDeviceOffset.getEndOffset()}));
+        new long[] {currentDeviceOffset.getStartOffset(), 
currentDeviceOffset.getEndOffset()});
   }
 
   @FunctionalInterface
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
index 4a7ce9f89b3..dfb1fd8b625 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
@@ -20,12 +20,14 @@
 package 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile;
 
 import org.apache.iotdb.calc.exception.MemoryNotEnoughException;
-import org.apache.iotdb.commons.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
 import org.apache.iotdb.commons.schema.filter.SchemaFilter;
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.ExternalTsFileDeviceFilterVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
@@ -79,7 +81,9 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
   private static final long TSFILE_READER_MEMORY_RESERVE_SIZE_IN_BYTES = 4L * 
1024;
 
   private final QueryId queryId;
-  private final MPPQueryContext queryContext;
+  // This resource outlives the frontend planning phase, whose MPPQueryContext 
memory manager is
+  // released after dispatch. Keep a dedicated manager and release it when 
this resource closes.
+  private final MemoryReservationManager 
externalTsFileResourceMemoryReservationManager;
   private final Path queryTempRoot;
   private final String tableName;
   private final List<String> tsFilePaths;
@@ -98,8 +102,10 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
       List<String> tsFilePaths,
       LongConsumer ioSizeRecorder,
       boolean useExactTempRoot) {
-    this.queryContext = requireNonNull(queryContext, "queryContext is null");
-    this.queryId = queryContext.getQueryId();
+    this.queryId = requireNonNull(queryContext, "queryContext is 
null").getQueryId();
+    this.externalTsFileResourceMemoryReservationManager =
+        new ThreadSafeMemoryReservationManager(
+            queryId, ExternalTsFileQueryResource.class.getName());
     this.queryTempRoot =
         useExactTempRoot
             ? requireNonNull(tempRoot, "tempRoot is null")
@@ -146,7 +152,7 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
   }
 
   private void acquireMemoryForTsFileReaders() {
-    queryContext.reserveMemoryForFrontEndImmediately(
+    externalTsFileResourceMemoryReservationManager.reserveMemoryImmediately(
         tsFilePaths.size() * TSFILE_READER_MEMORY_RESERVE_SIZE_IN_BYTES);
   }
 
@@ -160,6 +166,11 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
     }
   }
 
+  @TestOnly
+  public void setDeviceEntryComparator(Comparator<DeviceEntry> 
deviceEntryComparator) {
+    this.deviceEntryComparator = deviceEntryComparator;
+  }
+
   public List<String> getTsFilePaths() {
     return tsFilePaths;
   }
@@ -193,10 +204,14 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
     }
     closed = true;
 
-    releaseFileReaderReferences();
+    try {
+      releaseFileReaderReferences();
 
-    if (Files.exists(queryTempRoot)) {
-      FileUtils.deleteFileOrDirectory(queryTempRoot.toFile(), true);
+      if (Files.exists(queryTempRoot)) {
+        FileUtils.deleteFileOrDirectory(queryTempRoot.toFile(), true);
+      }
+    } finally {
+      
externalTsFileResourceMemoryReservationManager.releaseAllReservedMemory();
     }
   }
 
@@ -248,7 +263,7 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
     private long reservedBytes;
     private long unreservedBytes;
 
-    private DeviceTaskPartition(int partitionIndex) {
+    DeviceTaskPartition(int partitionIndex) {
       this.partitionIndex = partitionIndex;
     }
 
@@ -260,12 +275,12 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
       return deviceEntryIndexes;
     }
 
-    private void add(DeviceTask deviceTask) {
+    void add(DeviceTask deviceTask) {
       pendingDeviceTasks.add(deviceTask);
       unreservedBytes += deviceTask.ramBytesUsed();
     }
 
-    private void flush() {
+    void flush() {
       if (pendingDeviceTasks.isEmpty()) {
         return;
       }
@@ -315,7 +330,7 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
 
     private boolean reserveUnreservedMemory() {
       try {
-        queryContext.reserveMemoryForFrontEndImmediately(unreservedBytes);
+        
externalTsFileResourceMemoryReservationManager.reserveMemoryImmediately(unreservedBytes);
       } catch (MemoryNotEnoughException e) {
         return false;
       }
@@ -330,7 +345,7 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
 
     private void releaseDeviceTaskMemory() {
       if (reservedBytes != 0) {
-        queryContext.releaseMemoryReservedForFrontEnd(reservedBytes);
+        
externalTsFileResourceMemoryReservationManager.releaseMemoryCumulatively(reservedBytes);
         reservedBytes = 0;
       }
       unreservedBytes = 0;
@@ -340,7 +355,7 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
       return !deviceEntryIndexes.isEmpty();
     }
 
-    private void finish() {
+    void finish() {
       if (pendingDeviceTasks.isEmpty()) {
         return;
       }
@@ -420,7 +435,7 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
     private QueryDataSource currentDeviceQueryDataSource;
     private Map<TsFileResource, DeviceOffset> currentDeviceOffsetMap;
 
-    private DeviceTaskRunReader(DeviceTaskPartition partition) throws 
IOException {
+    DeviceTaskRunReader(DeviceTaskPartition partition) throws IOException {
       Comparator<DeviceEntry> comparator = deviceEntryComparator;
       this.usePriorityQueue = comparator != null;
       this.runCursors =
@@ -699,7 +714,7 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
     }
   }
 
-  private static class DeviceTask implements Accountable {
+  static class DeviceTask implements Accountable {
 
     private static final long INSTANCE_SIZE =
         RamUsageEstimator.shallowSizeOfInstance(DeviceTask.class);
@@ -707,7 +722,7 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
     private final int deviceEntryIndex;
     private final List<DeviceOffset> deviceOffsets;
 
-    private DeviceTask(int deviceEntryIndex, List<DeviceOffset> deviceOffsets) 
{
+    DeviceTask(int deviceEntryIndex, List<DeviceOffset> deviceOffsets) {
       this.deviceEntryIndex = deviceEntryIndex;
       this.deviceOffsets = deviceOffsets;
     }
@@ -738,7 +753,7 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
     @Override
     public long ramBytesUsed() {
       return INSTANCE_SIZE
-          + MemoryEstimationHelper.ARRAY_LIST_INSTANCE_SIZE
+          + RamUsageEstimator.shallowSizeOfInstance(ArrayList.class)
           + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER
           + (long) RamUsageEstimator.NUM_BYTES_OBJECT_REF * 
deviceOffsets.size()
           + deviceOffsets.size() * DeviceOffset.INSTANCE_SIZE;
@@ -754,7 +769,7 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
     private final long startOffset;
     private final long endOffset;
 
-    private DeviceOffset(int fileIndex, long startOffset, long endOffset) {
+    DeviceOffset(int fileIndex, long startOffset, long endOffset) {
       this.fileIndex = fileIndex;
       this.startOffset = startOffset;
       this.endOffset = endOffset;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/TsFileSchemaCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/TsFileSchemaCollector.java
index aa2ccdeab8b..395b6c9d6ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/TsFileSchemaCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/TsFileSchemaCollector.java
@@ -188,6 +188,7 @@ final class TsFileSchemaCollector {
     private IMeasurementSchema timeColumnSchema;
     private final List<IMeasurementSchema> tagColumnSchemas = new 
ArrayList<>();
     private final Map<String, IMeasurementSchema> fieldColumnSchemaMap = new 
LinkedHashMap<>();
+    private final Map<String, ColumnCategory> columnCategoryMap = new 
LinkedHashMap<>();
 
     private MergedTableSchemaBuilder(String tableName, TableSchema 
tableSchema) {
       this.tableName = tableName.toLowerCase(Locale.ENGLISH);
@@ -202,15 +203,18 @@ final class TsFileSchemaCollector {
       List<ColumnCategory> columnCategories = tableSchema.getColumnTypes();
 
       for (int i = 0; i < columnCategories.size(); i++) {
-        if (columnCategories.get(i) == ColumnCategory.TIME) {
+        ColumnCategory currentCategory = columnCategories.get(i);
+        if (currentCategory == ColumnCategory.TIME) {
           if (currentTimeColumn != null) {
             throw new UDFArgumentNotValidException(
                 "Multiple time columns found when merging table schema for 
table " + tableName);
           }
           currentTimeColumn = columnSchemas.get(i);
-        } else if (columnCategories.get(i) == ColumnCategory.TAG) {
+        } else if (currentCategory == ColumnCategory.TAG) {
+          checkAndRecordColumnCategory(columnSchemas.get(i), currentCategory);
           currentTagColumns.add(columnSchemas.get(i));
-        } else if (columnCategories.get(i) == ColumnCategory.FIELD) {
+        } else if (currentCategory == ColumnCategory.FIELD) {
+          checkAndRecordColumnCategory(columnSchemas.get(i), currentCategory);
           currentFieldColumns.add(columnSchemas.get(i));
         }
       }
@@ -265,6 +269,20 @@ final class TsFileSchemaCollector {
       }
     }
 
+    private void checkAndRecordColumnCategory(
+        IMeasurementSchema columnSchema, ColumnCategory currentCategory) {
+      String columnName = 
columnSchema.getMeasurementName().toLowerCase(Locale.ENGLISH);
+      ColumnCategory existingCategory = columnCategoryMap.get(columnName);
+      if (existingCategory != null && existingCategory != currentCategory) {
+        throw new UDFArgumentNotValidException(
+            "Column "
+                + columnSchema.getMeasurementName()
+                + " has conflicting categories when merging table schema for 
table "
+                + tableName);
+      }
+      columnCategoryMap.putIfAbsent(columnName, currentCategory);
+    }
+
     private TableSchema build() {
       List<IMeasurementSchema> columnSchemas = new ArrayList<>();
       List<ColumnCategory> columnCategories = new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
index 1e1df3e3c57..255cce7c738 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
@@ -53,7 +53,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.WeakHashMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -146,7 +145,7 @@ public class TimeSeriesMetadataCache {
       boolean debug,
       QueryContext queryContext)
       throws IOException {
-    return get(filePath, key, allSensors, ignoreNotExists, debug, 
queryContext, Optional.empty());
+    return get(filePath, key, allSensors, ignoreNotExists, debug, 
queryContext, null);
   }
 
   @SuppressWarnings({"squid:S1860", "squid:S6541", "squid:S3776"}) // Suppress 
synchronize warning
@@ -157,7 +156,7 @@ public class TimeSeriesMetadataCache {
       boolean ignoreNotExists,
       boolean debug,
       QueryContext queryContext,
-      Optional<long[]> deviceMetadataIndexNodeOffset)
+      long[] deviceMetadataIndexNodeOffset)
       throws IOException {
     long startTime = System.nanoTime();
     long loadBloomFilterTime = 0;
@@ -176,7 +175,7 @@ public class TimeSeriesMetadataCache {
         TsFileSequenceReader reader =
             FileReaderManager.getInstance()
                 .get(filePath, key.tsFileID, true, bloomFilterIoSizeRecorder, 
externalTsFile);
-        if (!deviceMetadataIndexNodeOffset.isPresent()) {
+        if (deviceMetadataIndexNodeOffset == null) {
           BloomFilter bloomFilter = 
reader.readBloomFilter(bloomFilterIoSizeRecorder);
           
queryContext.getQueryStatistics().getLoadBloomFilterFromDiskCount().incrementAndGet();
           if (bloomFilter != null
@@ -218,7 +217,7 @@ public class TimeSeriesMetadataCache {
           if (timeseriesMetadata == null) {
             cacheHit = false;
 
-            if (!deviceMetadataIndexNodeOffset.isPresent()) {
+            if (deviceMetadataIndexNodeOffset == null) {
               long loadBloomFilterStartTime = System.nanoTime();
               // bloom filter part
               BloomFilter bloomFilter =
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResourceTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResourceTest.java
new file mode 100644
index 00000000000..c1db965364d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResourceTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile;
+
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceOffset;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceTask;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceTaskPartition;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceTaskRunReader;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Binary;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ExternalTsFileQueryResourceTest {
+
+  @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private ExternalTsFileQueryResource resource;
+
+  @After
+  public void tearDown() {
+    if (resource != null) {
+      resource.close();
+    }
+  }
+
+  @Test
+  public void testDeviceTaskRunReaderMergesMultipleRunsWithComparator() throws 
Exception {
+    resource = newResource("merge_comparator", Arrays.asList("file-0.tsfile", 
"file-1.tsfile"));
+    addDevices("d1", "d2", "d3", "d4", "d5");
+    
resource.setDeviceEntryComparator(Comparator.comparing(DeviceEntry::getDeviceID));
+    DeviceTaskPartition partition = resource.new DeviceTaskPartition(0);
+
+    partition.add(task(2, offset(0, 30, 39)));
+    partition.add(task(4, offset(1, 50, 59)));
+    partition.flush();
+    partition.add(task(0, offset(0, 10, 19)));
+    partition.add(task(3, offset(1, 40, 49)));
+    partition.flush();
+    partition.add(task(1, offset(0, 20, 29)));
+    partition.flush();
+
+    try (DeviceTaskRunReader reader = resource.new 
DeviceTaskRunReader(partition)) {
+      assertDeviceOrder(reader, "d1", "d2", "d3", "d4", "d5");
+    }
+  }
+
+  @Test
+  public void testDeviceTaskRunReaderReadsRunsInFifoOrderWithoutComparator() 
throws Exception {
+    resource = newResource("fifo", Collections.singletonList("file-0.tsfile"));
+    addDevices("d1", "d2", "d3", "d4");
+    DeviceTaskPartition partition = resource.new DeviceTaskPartition(0);
+
+    partition.add(task(2, offset(0, 30, 39)));
+    partition.flush();
+    partition.add(task(0, offset(0, 10, 19)));
+    partition.add(task(1, offset(0, 20, 29)));
+    partition.flush();
+    partition.add(task(3, offset(0, 40, 49)));
+    partition.flush();
+
+    try (DeviceTaskRunReader reader = resource.new 
DeviceTaskRunReader(partition)) {
+      assertDeviceOrder(reader, "d3", "d1", "d2", "d4");
+    }
+  }
+
+  @Test
+  public void 
testDeviceTaskRunReaderReadsDiskRunBeforePendingMemoryTasksWithoutComparator()
+      throws Exception {
+    resource = newResource("disk_memory", 
Collections.singletonList("file-0.tsfile"));
+    addDevices("d1", "d2", "d3");
+    DeviceTaskPartition partition = resource.new DeviceTaskPartition(0);
+
+    partition.add(task(1, offset(0, 20, 29)));
+    partition.flush();
+    partition.add(task(0, offset(0, 10, 19)));
+    partition.add(task(2, offset(0, 30, 39)));
+    partition.finish();
+
+    try (DeviceTaskRunReader reader = resource.new 
DeviceTaskRunReader(partition)) {
+      assertDeviceOrder(reader, "d2", "d1", "d3");
+    }
+  }
+
+  @Test
+  public void testDeviceTaskRunReaderUsesSharedTsFileResourceAsOffsetMapKey() 
throws Exception {
+    resource = newResource("offset_map", Arrays.asList("file-0.tsfile", 
"file-1.tsfile"));
+    addDevices("d1");
+    DeviceTaskPartition partition = resource.new DeviceTaskPartition(0);
+    partition.add(task(0, offset(0, 11, 22), offset(1, 33, 44)));
+    partition.finish();
+
+    try (DeviceTaskRunReader reader = resource.new 
DeviceTaskRunReader(partition)) {
+      assertTrue(reader.nextDevice());
+      Map<TsFileResource, DeviceOffset> offsetMap = 
reader.getCurrentDeviceOffsetMap();
+      List<TsFileResource> sharedResources = 
resource.getSharedTsFileResources();
+
+      assertEquals(2, offsetMap.size());
+      assertTrue(offsetMap.containsKey(sharedResources.get(0)));
+      assertTrue(offsetMap.containsKey(sharedResources.get(1)));
+      assertOffset(offsetMap.get(sharedResources.get(0)), 0, 11, 22);
+      assertOffset(offsetMap.get(sharedResources.get(1)), 1, 33, 44);
+      assertEquals(sharedResources, 
reader.getCurrentDeviceQueryDataSource().getUnseqResources());
+      assertFalse(reader.nextDevice());
+    }
+  }
+
+  private ExternalTsFileQueryResource newResource(String queryId, List<String> 
fileNames)
+      throws Exception {
+    File root = temporaryFolder.newFolder(queryId);
+    List<String> tsFilePaths = new ArrayList<>(fileNames.size());
+    for (String fileName : fileNames) {
+      tsFilePaths.add(new File(root, fileName).getAbsolutePath());
+    }
+    MPPQueryContext queryContext = new MPPQueryContext(new QueryId(queryId));
+    return new ExternalTsFileQueryResource(
+        queryContext, root.toPath().resolve("tmp"), "table1", tsFilePaths, 
ignored -> {}, true);
+  }
+
+  private void addDevices(String... deviceNames) {
+    for (String deviceName : deviceNames) {
+      resource
+          .getSharedDeviceEntries()
+          .add(
+              new AlignedDeviceEntry(
+                  IDeviceID.Factory.DEFAULT_FACTORY.create(deviceName), new 
Binary[0]));
+    }
+  }
+
+  private DeviceTask task(int deviceEntryIndex, DeviceOffset... offsets) {
+    return new DeviceTask(deviceEntryIndex, Arrays.asList(offsets));
+  }
+
+  private DeviceOffset offset(int fileIndex, long startOffset, long endOffset) 
{
+    return new DeviceOffset(fileIndex, startOffset, endOffset);
+  }
+
+  private void assertDeviceOrder(DeviceTaskRunReader reader, String... 
expectedDeviceNames)
+      throws Exception {
+    for (String expectedDeviceName : expectedDeviceNames) {
+      assertTrue(reader.nextDevice());
+      assertEquals(expectedDeviceName, 
reader.getCurrentDevice().getDeviceID().toString());
+    }
+    assertFalse(reader.nextDevice());
+  }
+
+  private void assertOffset(
+      DeviceOffset offset,
+      int expectedFileIndex,
+      long expectedStartOffset,
+      long expectedEndOffset) {
+    assertEquals(expectedFileIndex, offset.getFileIndex());
+    assertEquals(expectedStartOffset, offset.getStartOffset());
+    assertEquals(expectedEndOffset, offset.getEndOffset());
+  }
+}

Reply via email to