This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch read_tsfile_table_function
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 194cd3076122a636aecb1e4c34bb81e5bde52745
Author: shuwenwei <[email protected]>
AuthorDate: Wed Jun 10 17:59:34 2026 +0800

    fix bug
---
 .../java/org/apache/iotdb/it/env/EnvFactory.java   |   2 +-
 .../recent/IoTDBReadTsFileTableFunctionIT.java     | 388 +++++++++++++++++++++
 .../fragment/FragmentInstanceContext.java          |  79 ++---
 .../relational/AbstractAggTableScanOperator.java   |  42 +--
 .../AbstractDeviceTableScanOperator.java           |  78 -----
 .../relational/AbstractTableScanOperator.java      |  50 ++-
 .../ExternalTsFileAggTableScanOperator.java        | 112 +++---
 .../relational/ExternalTsFileSeriesScanUtil.java   |  21 +-
 .../ExternalTsFileTableScanOperator.java           | 104 +++---
 .../source/relational/TableScanOperator.java       |   2 +-
 .../TreeAlignedDeviceViewScanOperator.java         |   2 +-
 .../planner/DataNodeTableOperatorGenerator.java    |  27 +-
 .../readTsFile/ExternalTsFileQueryDataSource.java  |  67 ++++
 .../readTsFile/ExternalTsFileQueryResource.java    | 278 +++++++++------
 .../distribute/TableDistributedPlanGenerator.java  |  27 +-
 15 files changed, 864 insertions(+), 415 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java 
b/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java
index 2fbc9672837..2855bec5b7b 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java
@@ -43,7 +43,7 @@ public class EnvFactory {
       try {
         Class.forName(Config.JDBC_DRIVER_NAME);
         logger.info(">>>>>>>{}", System.getProperty("TestEnv"));
-        EnvType envType = EnvType.getSystemEnvType();
+        EnvType envType = EnvType.Remote;
         switch (envType) {
           case Simple:
           case TABLE_SIMPLE:
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBReadTsFileTableFunctionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBReadTsFileTableFunctionIT.java
new file mode 100644
index 00000000000..ed236b901ac
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBReadTsFileTableFunctionIT.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.query.recent;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.record.Tablet;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.tableAssertTestFail;
+import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBReadTsFileTableFunctionIT {
+  private static final String DATABASE_NAME = "test_read_tsfile";
+
+  private static File tmpDir;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().initClusterEnvironment();
+    try (Connection connection = EnvFactory.getEnv().getTableConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE DATABASE " + DATABASE_NAME);
+    }
+  }
+
+  @Before
+  public void setUpBeforeTest() throws IOException {
+    tmpDir = new File(Files.createTempDirectory("read-tsfile").toUri());
+  }
+
+  @After
+  public void tearDownAfterTest() {
+    deleteTmpDir();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testReadSingleTsFile() throws Exception {
+    File tsFile = new File(tmpDir, "single.tsfile");
+    try (TsFileWriter writer = new TsFileWriter(tsFile)) {
+      generateTable(
+          writer, "table1", Arrays.asList("tag1", "tag2"), Arrays.asList("s1", 
"s2"), 1, 2);
+      generateTable(writer, "table2", Arrays.asList("tag1"), 
Arrays.asList("s1"), 1, 2);
+    }
+
+    String[] expectedHeader = new String[] {"time", "tag1", "tag2", "s1", 
"s2"};
+    String[] retArray =
+        new String[] {
+          "1970-01-01T00:00:00.001Z,tag1_1,tag2_1,1,1,",
+          "1970-01-01T00:00:00.002Z,tag1_1,tag2_1,2,2,",
+          "1970-01-01T00:00:00.001Z,tag1_2,tag2_2,1,1,",
+          "1970-01-01T00:00:00.002Z,tag1_2,tag2_2,2,2,",
+        };
+    tableResultSetEqualTest(
+        "SELECT time, tag1, tag2, s1, s2 FROM read_tsfile(PATHS => '"
+            + toSqlPath(tsFile)
+            + "', TABLE_NAME => 'table1') ORDER BY tag1, tag2, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testReadTsFileWithNoMatchedDevice() throws Exception {
+    File tsFile = new File(tmpDir, "empty-device.tsfile");
+    try (TsFileWriter writer = new TsFileWriter(tsFile)) {
+      generateTable(
+          writer, "table1", Arrays.asList("tag1", "tag2"), Arrays.asList("s1", 
"s2"), 1, 2);
+    }
+
+    tableResultSetEqualTest(
+        "SELECT time, tag1, tag2, s1, s2 FROM read_tsfile(PATHS => '"
+            + toSqlPath(tsFile)
+            + "', TABLE_NAME => 'table1') WHERE tag1 = 'not_exists'",
+        new String[] {"time", "tag1", "tag2", "s1", "s2"},
+        new String[] {},
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testReadTsFileAggregationWithNoMatchedDevice() throws Exception {
+    File tsFile = new File(tmpDir, "empty-device-aggregation.tsfile");
+    try (TsFileWriter writer = new TsFileWriter(tsFile)) {
+      generateTable(
+          writer, "table1", Arrays.asList("tag1", "tag2"), Arrays.asList("s1", 
"s2"), 1, 2);
+    }
+
+    tableResultSetEqualTest(
+        "SELECT count(*) AS count_star, count(s1) AS count_s1, sum(s1) AS 
sum_s1"
+            + " FROM read_tsfile(PATHS => '"
+            + toSqlPath(tsFile)
+            + "', TABLE_NAME => 'table1') WHERE tag1 = 'not_exists'",
+        new String[] {"count_star", "count_s1", "sum_s1"},
+        new String[] {"0,0,null,"},
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testReadMultipleTsFilesWithDeviceFilter() throws Exception {
+    File tsFile1 = new File(tmpDir, "multi-1.tsfile");
+    try (TsFileWriter writer = new TsFileWriter(tsFile1)) {
+      generateTable(
+          writer, "table1", Arrays.asList("tag1", "tag2"), Arrays.asList("s1", 
"s2"), 1, 2);
+    }
+    File tsFile2 = new File(tmpDir, "multi-2.tsfile");
+    try (TsFileWriter writer = new TsFileWriter(tsFile2)) {
+      generateTable(
+          writer, "table1", Arrays.asList("tag1", "tag2"), Arrays.asList("s1", 
"s2"), 3, 4);
+    }
+
+    String[] expectedHeader = new String[] {"time", "tag1", "tag2", "s1", 
"s2"};
+    String[] retArray =
+        new String[] {
+          "1970-01-01T00:00:00.001Z,tag1_1,tag2_1,1,1,",
+          "1970-01-01T00:00:00.002Z,tag1_1,tag2_1,2,2,",
+          "1970-01-01T00:00:00.003Z,tag1_1,tag2_1,3,3,",
+          "1970-01-01T00:00:00.004Z,tag1_1,tag2_1,4,4,",
+        };
+    tableResultSetEqualTest(
+        "SELECT time, tag1, tag2, s1, s2 FROM read_tsfile(PATHS => '"
+            + toSqlPath(tsFile1)
+            + ","
+            + toSqlPath(tsFile2)
+            + "', TABLE_NAME => 'table1') WHERE tag1 = 'tag1_1' ORDER BY time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testReadMultipleTsFilesWithSchemaMerge() throws Exception {
+    File tsFile1 = new File(tmpDir, "schema-merge-1.tsfile");
+    try (TsFileWriter writer = new TsFileWriter(tsFile1)) {
+      generateTable(writer, "table1", Arrays.asList("tag1"), 
Arrays.asList("s1"), 1, 2);
+    }
+    File tsFile2 = new File(tmpDir, "schema-merge-2.tsfile");
+    try (TsFileWriter writer = new TsFileWriter(tsFile2)) {
+      generateTable(writer, "table1", Arrays.asList("tag1"), 
Arrays.asList("s1", "s2"), 3, 4);
+    }
+
+    String[] expectedHeader = new String[] {"time", "tag1", "s1", "s2"};
+    String[] retArray =
+        new String[] {
+          "1970-01-01T00:00:00.001Z,tag1_1,1,null,",
+          "1970-01-01T00:00:00.002Z,tag1_1,2,null,",
+          "1970-01-01T00:00:00.003Z,tag1_1,3,3,",
+          "1970-01-01T00:00:00.004Z,tag1_1,4,4,",
+          "1970-01-01T00:00:00.001Z,tag1_2,1,null,",
+          "1970-01-01T00:00:00.002Z,tag1_2,2,null,",
+          "1970-01-01T00:00:00.003Z,tag1_2,3,3,",
+          "1970-01-01T00:00:00.004Z,tag1_2,4,4,",
+        };
+    tableResultSetEqualTest(
+        "SELECT time, tag1, s1, s2 FROM read_tsfile(PATHS => '"
+            + toSqlPath(tsFile1)
+            + ","
+            + toSqlPath(tsFile2)
+            + "', TABLE_NAME => 'table1') ORDER BY tag1, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testReadMultipleTsFilesWithConflictingFieldType() throws 
Exception {
+    File tsFile1 = new File(tmpDir, "conflict-1.tsfile");
+    try (TsFileWriter writer = new TsFileWriter(tsFile1)) {
+      generateTable(writer, "table1", Arrays.asList("tag1"), 
Arrays.asList("s1"), 1, 2);
+    }
+    File tsFile2 = new File(tmpDir, "conflict-2.tsfile");
+    try (TsFileWriter writer = new TsFileWriter(tsFile2)) {
+      generateTable(
+          writer, "table1", Arrays.asList("tag1"), Arrays.asList("s1"), 
TSDataType.DOUBLE, 3, 4);
+    }
+
+    tableAssertTestFail(
+        "SELECT * FROM read_tsfile(PATHS => '"
+            + toSqlPath(tsFile1)
+            + ","
+            + toSqlPath(tsFile2)
+            + "', TABLE_NAME => 'table1')",
+        "has conflicting data types when merging table schema",
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testReadMultipleTsFilesWithConflictingTagColumns() throws 
Exception {
+    File tsFile1 = new File(tmpDir, "tag-conflict-1.tsfile");
+    try (TsFileWriter writer = new TsFileWriter(tsFile1)) {
+      generateTable(writer, "table1", Arrays.asList("tag1"), 
Arrays.asList("s1"), 1, 2);
+    }
+    File tsFile2 = new File(tmpDir, "tag-conflict-2.tsfile");
+    try (TsFileWriter writer = new TsFileWriter(tsFile2)) {
+      generateTable(writer, "table1", Arrays.asList("tag2"), 
Arrays.asList("s1"), 3, 4);
+    }
+
+    tableAssertTestFail(
+        "SELECT * FROM read_tsfile(PATHS => '"
+            + toSqlPath(tsFile1)
+            + ","
+            + toSqlPath(tsFile2)
+            + "', TABLE_NAME => 'table1')",
+        "Tag columns conflict when merging table schema",
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testReadTsFileWithoutTableNameWhenMultipleTablesExist() throws 
Exception {
+    File tsFile = new File(tmpDir, "multiple-tables.tsfile");
+    try (TsFileWriter writer = new TsFileWriter(tsFile)) {
+      generateTable(writer, "table1", Arrays.asList("tag1"), 
Arrays.asList("s1"), 1, 2);
+      generateTable(writer, "table2", Arrays.asList("tag1"), 
Arrays.asList("s1"), 1, 2);
+    }
+
+    tableAssertTestFail(
+        "SELECT * FROM read_tsfile(PATHS => '" + toSqlPath(tsFile) + "')",
+        "Cannot infer table name from TsFile because multiple tables are 
found",
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testReadTsFileWithInvalidPaths() throws IOException {
+    File missingFile = new File(tmpDir, "missing.tsfile");
+    tableAssertTestFail(
+        "SELECT * FROM read_tsfile(PATHS => '" + toSqlPath(missingFile) + "')",
+        "TsFile path does not exist",
+        DATABASE_NAME);
+
+    DataNodeWrapper dataNodeWrapper = 
EnvFactory.getEnv().getDataNodeWrapper(0);
+    File dataDir =
+        new File(dataNodeWrapper.getDataNodeDir() + File.separator + "data", 
"forbidden.tsfile");
+    Files.createDirectories(dataDir.getParentFile().toPath());
+    Files.write(dataDir.toPath(), new byte[0]);
+    tableAssertTestFail(
+        "SELECT * FROM read_tsfile(PATHS => '" + toSqlPath(dataDir) + "')",
+        "is not allowed because it may access IoTDB data directory",
+        DATABASE_NAME);
+  }
+
+  private static void generateTable(
+      TsFileWriter writer,
+      String tableName,
+      List<String> tagColumns,
+      List<String> fieldColumns,
+      long startTime,
+      long endTime)
+      throws IOException, WriteProcessException {
+    generateTable(
+        writer, tableName, tagColumns, fieldColumns, TSDataType.INT64, 
startTime, endTime);
+  }
+
+  private static void generateTable(
+      TsFileWriter writer,
+      String tableName,
+      List<String> tagColumns,
+      List<String> fieldColumns,
+      TSDataType fieldType,
+      long startTime,
+      long endTime)
+      throws IOException, WriteProcessException {
+    List<String> columnNames = new ArrayList<>(tagColumns.size() + 
fieldColumns.size());
+    List<TSDataType> columnTypes = new ArrayList<>(tagColumns.size() + 
fieldColumns.size());
+    List<ColumnCategory> columnCategories =
+        new ArrayList<>(tagColumns.size() + fieldColumns.size());
+    for (String tagColumn : tagColumns) {
+      columnNames.add(tagColumn);
+      columnTypes.add(TSDataType.STRING);
+      columnCategories.add(ColumnCategory.TAG);
+    }
+    for (String fieldColumn : fieldColumns) {
+      columnNames.add(fieldColumn);
+      columnTypes.add(fieldType);
+      columnCategories.add(ColumnCategory.FIELD);
+    }
+
+    writer.registerTableSchema(
+        new TableSchema(tableName, columnNames, columnTypes, 
columnCategories));
+    Tablet tablet = new Tablet(tableName, columnNames, columnTypes, 
columnCategories);
+    for (int deviceIndex = 1; deviceIndex <= 2; deviceIndex++) {
+      for (long time = startTime; time <= endTime; time++) {
+        int row = tablet.getRowSize();
+        tablet.addTimestamp(row, time);
+        for (int i = 0; i < tagColumns.size(); i++) {
+          tablet.addValue(row, i, tagColumns.get(i) + "_" + deviceIndex);
+        }
+        for (int i = 0; i < fieldColumns.size(); i++) {
+          addFieldValue(tablet, row, tagColumns.size() + i, fieldType, time);
+        }
+        if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
+          writer.writeTable(tablet);
+          tablet.reset();
+        }
+      }
+    }
+    if (tablet.getRowSize() != 0) {
+      writer.writeTable(tablet);
+    }
+  }
+
+  private static void addFieldValue(
+      Tablet tablet, int row, int column, TSDataType fieldType, long time) {
+    if (fieldType == TSDataType.DOUBLE) {
+      tablet.addValue(row, column, (double) time);
+      return;
+    }
+    tablet.addValue(row, column, time);
+  }
+
+  private static String toSqlPath(File file) {
+    return file.getAbsolutePath().replace("\\", "\\\\").replace("'", "''");
+  }
+
+  private static void clearTmpDir() {
+    if (tmpDir == null || !tmpDir.exists()) {
+      return;
+    }
+    File[] files = tmpDir.listFiles();
+    if (files != null) {
+      for (File file : files) {
+        try {
+          Files.delete(file.toPath());
+        } catch (IOException ignored) {
+          // ignore
+        }
+      }
+    }
+  }
+
+  private static void deleteTmpDir() {
+    clearTmpDir();
+    if (tmpDir == null || !tmpDir.exists()) {
+      return;
+    }
+    try {
+      Files.delete(tmpDir.toPath());
+    } catch (IOException ignored) {
+      // ignore
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 8ae3e885f5e..24c28e668b1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -46,6 +46,8 @@ import 
org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
 import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
 import 
org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryDataSource;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
@@ -56,8 +58,6 @@ import 
org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegio
 import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceType;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
-import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
 import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;
@@ -71,8 +71,6 @@ import org.apache.tsfile.utils.RamUsageEstimator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
 import java.time.ZoneId;
 import java.time.format.DateTimeParseException;
 import java.util.ArrayList;
@@ -121,8 +119,7 @@ public class FragmentInstanceContext extends QueryContext {
   // Used for region scan, relating methods are to be added.
   private Map<IDeviceID, DeviceContext> devicePathsToContext;
 
-  private List<String> externalTsFilePaths;
-  private List<TsFileResource> externalTsFileResources;
+  private ExternalTsFileQueryResource externalTsFileQueryResource;
 
   // Shared by all scan operators in this fragment instance to avoid memory 
problem
   protected IQueryDataSource sharedQueryDataSource;
@@ -624,12 +621,26 @@ public class FragmentInstanceContext extends QueryContext 
{
     this.devicePathsToContext = devicePathsToContext;
   }
 
-  public void addExternalTsFilePaths(List<String> externalTsFilePaths) {
-    if (this.externalTsFilePaths == null) {
-      this.externalTsFilePaths = new ArrayList<>(externalTsFilePaths);
-      return;
+  public void addExternalTsFileQueryResource(
+      ExternalTsFileQueryResource externalTsFileQueryResource) {
+    this.externalTsFileQueryResource = externalTsFileQueryResource;
+  }
+
+  public boolean initExternalTsFileQueryDataSource(
+      ExternalTsFileQueryResource externalTsFileQueryResource) {
+    long startTime = System.nanoTime();
+    try {
+      if (externalTsFileQueryResource == null) {
+        this.sharedQueryDataSource = EMPTY_QUERY_DATA_SOURCE;
+        return true;
+      }
+
+      this.sharedQueryDataSource = new 
ExternalTsFileQueryDataSource(externalTsFileQueryResource);
+      closedUnseqFileNum = 
externalTsFileQueryResource.getTsFileResources().size();
+      return true;
+    } finally {
+      addInitQueryDataSourceCost(System.nanoTime() - startTime);
     }
-    this.externalTsFilePaths.addAll(externalTsFilePaths);
   }
 
   public MemoryReservationManager getMemoryReservationContext() {
@@ -799,44 +810,6 @@ public class FragmentInstanceContext extends QueryContext {
     }
   }
 
-  public boolean initExternalTsFileQueryDataSource(List<String> 
externalTsFilePaths)
-      throws QueryProcessException {
-    long startTime = System.nanoTime();
-    try {
-      if (externalTsFilePaths == null || externalTsFilePaths.isEmpty()) {
-        this.sharedQueryDataSource = EMPTY_QUERY_DATA_SOURCE;
-        return true;
-      }
-
-      externalTsFileResources = new ArrayList<>(externalTsFilePaths.size());
-      for (String externalTsFilePath : externalTsFilePaths) {
-        TsFileResource resource =
-            new TsFileResource(new File(externalTsFilePath), 
TsFileResourceStatus.NORMAL);
-        if (resource.resourceFileExists()) {
-          try {
-            resource.deserialize();
-          } catch (IOException e) {
-            throw new QueryProcessException(
-                "Failed to deserialize external TsFile resource: "
-                    + externalTsFilePath
-                    + ", "
-                    + e.getMessage());
-          }
-        } else {
-          resource.setTimeIndex(new FileTimeIndex(Long.MIN_VALUE, 
Long.MAX_VALUE));
-        }
-        externalTsFileResources.add(resource);
-      }
-
-      this.sharedQueryDataSource =
-          new QueryDataSource(Collections.emptyList(), 
externalTsFileResources);
-      closedUnseqFileNum = externalTsFileResources.size();
-      return true;
-    } finally {
-      addInitQueryDataSourceCost(System.nanoTime() - startTime);
-    }
-  }
-
   public synchronized IQueryDataSource getSharedQueryDataSource() throws 
QueryProcessException {
     if (sharedQueryDataSource == null) {
       switch (queryDataSourceType) {
@@ -863,8 +836,8 @@ public class FragmentInstanceContext extends QueryContext {
           }
           break;
         case EXTERNAL_TSFILE_SCAN:
-          if (initExternalTsFileQueryDataSource(externalTsFilePaths)) {
-            externalTsFilePaths = null;
+          if (initExternalTsFileQueryDataSource(externalTsFileQueryResource)) {
+            externalTsFileQueryResource = null;
           } else {
             return getUnfinishedQueryDataSource();
           }
@@ -1081,9 +1054,7 @@ public class FragmentInstanceContext extends QueryContext 
{
       unClosedFilePaths = null;
     }
 
-    if (externalTsFileResources != null) {
-      externalTsFileResources = null;
-    }
+    externalTsFileQueryResource = null;
 
     // release TVList/AlignedTVList owned by current query
     releaseTVListOwnedByQuery();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
index e3030c0752c..031c10dc8a4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
@@ -251,18 +251,9 @@ public abstract class AbstractAggTableScanOperator extends 
AbstractDataSourceOpe
       // all data of current device has been consumed
       updateResultTsBlock();
       timeIterator.resetCurTimeRange();
-      nextDevice();
-
-      if (currentDeviceIndex < deviceCount) {
-        // construct AlignedSeriesScanUtil for next device
-        constructAlignedSeriesScanUtil();
-        queryDataSource.reset();
-        this.seriesScanUtil.initQueryDataSource(queryDataSource);
-      }
+      moveToNextDevice();
 
       if (currentDeviceIndex >= deviceCount) {
-        // all devices have been consumed
-        timeIterator.setFinished();
         return Optional.of(true);
       } else {
         return Optional.of(false);
@@ -715,22 +706,25 @@ public abstract class AbstractAggTableScanOperator 
extends AbstractDataSourceOpe
     if (allAggregatorsHasFinalResult
         && (timeIterator.getType() == 
ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR
             || tableAggregators.isEmpty())) {
-      nextDevice();
-      inputTsBlock = null;
-
-      if (currentDeviceIndex < deviceCount) {
-        // construct AlignedSeriesScanUtil for next device
-        constructAlignedSeriesScanUtil();
-        queryDataSource.reset();
-        this.seriesScanUtil.initQueryDataSource(queryDataSource);
-      }
+      moveToNextDevice();
+      allAggregatorsHasFinalResult = false;
+    }
+  }
 
-      if (currentDeviceIndex >= deviceCount) {
-        // all devices have been consumed
-        timeIterator.setFinished();
-      }
+  protected void moveToNextDevice() throws Exception {
+    nextDevice();
+    inputTsBlock = null;
 
-      allAggregatorsHasFinalResult = false;
+    if (currentDeviceIndex < deviceCount) {
+      // construct AlignedSeriesScanUtil for next device
+      constructAlignedSeriesScanUtil();
+      queryDataSource.reset();
+      this.seriesScanUtil.initQueryDataSource(queryDataSource);
+    }
+
+    if (currentDeviceIndex >= deviceCount) {
+      // all devices have been consumed
+      timeIterator.setFinished();
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDeviceTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDeviceTableScanOperator.java
deleted file mode 100644
index e4b119291ae..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDeviceTableScanOperator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
-
-import org.apache.iotdb.calc.plan.planner.CommonOperatorUtils;
-import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
-
-import org.apache.tsfile.utils.RamUsageEstimator;
-
-import java.util.List;
-
-import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER;
-
-public abstract class AbstractDeviceTableScanOperator extends 
AbstractTableScanOperator {
-  private static final long INSTANCE_SIZE =
-      
RamUsageEstimator.shallowSizeOfInstance(AbstractDeviceTableScanOperator.class);
-
-  protected List<DeviceEntry> deviceEntries;
-  protected int deviceCount;
-  protected int currentDeviceIndex;
-
-  protected AbstractDeviceTableScanOperator(AbstractTableScanOperatorParameter 
parameter) {
-    super(parameter);
-    this.deviceEntries = parameter.deviceEntries;
-    this.deviceCount = parameter.deviceEntries.size();
-    this.currentDeviceIndex = 0;
-    this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, 
Integer.toString(this.deviceCount));
-    recordCurrentDeviceIndex();
-    constructAlignedSeriesScanUtil();
-  }
-
-  @Override
-  protected boolean hasCurrentDeviceEntry() {
-    return currentDeviceIndex < deviceCount;
-  }
-
-  @Override
-  protected DeviceEntry getCurrentDeviceEntry() {
-    return deviceEntries.get(currentDeviceIndex);
-  }
-
-  @Override
-  protected boolean advanceDeviceEntry() {
-    currentDeviceIndex++;
-    return hasCurrentDeviceEntry();
-  }
-
-  @Override
-  protected void recordCurrentDeviceIndex() {
-    this.operatorContext.recordSpecifiedInfo(
-        CommonOperatorUtils.CURRENT_DEVICE_INDEX_STRING, 
Integer.toString(currentDeviceIndex));
-  }
-
-  @Override
-  public long ramBytesUsed() {
-    return super.ramBytesUsed()
-        + INSTANCE_SIZE
-        - AbstractTableScanOperator.INSTANCE_SIZE
-        + RamUsageEstimator.sizeOfCollection(deviceEntries);
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java
index 45481e95f11..523f0d1367a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
 
+import org.apache.iotdb.calc.plan.planner.CommonOperatorUtils;
 import org.apache.iotdb.commons.path.AlignedFullPath;
 import org.apache.iotdb.commons.queryengine.execution.MemoryEstimationHelper;
 import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -50,15 +51,20 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.commons.queryengine.plan.relational.type.InternalTypeManager.getTSDataType;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator.appendDataIntoBuilder;
+import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER;
 
 public abstract class AbstractTableScanOperator extends 
AbstractSeriesScanOperator {
   protected static final long INSTANCE_SIZE =
-      RamUsageEstimator.shallowSizeOfInstance(AbstractTableScanOperator.class);
+      RamUsageEstimator.shallowSizeOfInstance(TableScanOperator.class);
 
   private final List<ColumnSchema> columnSchemas;
 
   private final int[] columnsIndexArray;
 
+  protected final List<DeviceEntry> deviceEntries;
+
+  protected final int deviceCount;
+
   protected final Ordering scanOrder;
   protected final SeriesScanOptions seriesScanOptions;
 
@@ -78,11 +84,16 @@ public abstract class AbstractTableScanOperator extends 
AbstractSeriesScanOperat
 
   private QueryDataSource queryDataSource;
 
+  protected int currentDeviceIndex;
+
   public AbstractTableScanOperator(AbstractTableScanOperatorParameter 
parameter) {
     this.sourceId = parameter.sourceId;
     this.operatorContext = parameter.context;
     this.columnSchemas = parameter.columnSchemas;
     this.columnsIndexArray = parameter.columnsIndexArray;
+    this.deviceEntries = parameter.deviceEntries;
+    this.deviceCount = parameter.deviceEntries.size();
+    this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, 
Integer.toString(this.deviceCount));
     this.scanOrder = parameter.scanOrder;
     this.seriesScanOptions = parameter.seriesScanOptions;
     this.measurementColumnNames = parameter.measurementColumnNames;
@@ -92,12 +103,17 @@ public abstract class AbstractTableScanOperator extends 
AbstractSeriesScanOperat
         parameter.measurementSchemas.stream()
             .map(IMeasurementSchema::getType)
             .collect(Collectors.toList());
+    this.currentDeviceIndex = 0;
+    this.operatorContext.recordSpecifiedInfo(
+        CommonOperatorUtils.CURRENT_DEVICE_INDEX_STRING, Integer.toString(0));
+
     // allSensors include time and all field columns
     this.maxReturnSize =
         Math.min(
             maxReturnSize,
             allSensors.size() * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
     this.maxTsBlockLineNum = parameter.maxTsBlockLineNum;
+    constructAlignedSeriesScanUtil();
   }
 
   @Override
@@ -181,7 +197,7 @@ public abstract class AbstractTableScanOperator extends 
AbstractSeriesScanOperat
   }
 
   private void constructResultTsBlock() {
-    DeviceEntry currentDeviceEntry = getCurrentDeviceEntry();
+    DeviceEntry currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
     this.resultTsBlock =
         MeasurementToTableViewAdaptorUtils.toTableBlock(
             measurementDataBlock,
@@ -201,7 +217,7 @@ public abstract class AbstractTableScanOperator extends 
AbstractSeriesScanOperat
   @Override
   public boolean isFinished() throws Exception {
     return (retainedTsBlock == null)
-        && (!hasCurrentDeviceEntry() || seriesScanOptions.limitConsumedUp());
+        && (currentDeviceIndex >= deviceCount || 
seriesScanOptions.limitConsumedUp());
   }
 
   @Override
@@ -233,37 +249,32 @@ public abstract class AbstractTableScanOperator extends 
AbstractSeriesScanOperat
     
this.measurementDataBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum);
   }
 
-  private void moveToNextDevice() {
-    if (advanceDeviceEntry()) {
+  protected void moveToNextDevice() {
+    currentDeviceIndex++;
+    if (currentDeviceIndex < deviceCount) {
       // construct AlignedSeriesScanUtil for next device
       constructAlignedSeriesScanUtil();
 
       // reset QueryDataSource
       queryDataSource.reset();
       this.seriesScanUtil.initQueryDataSource(queryDataSource);
-      recordCurrentDeviceIndex();
+      this.operatorContext.recordSpecifiedInfo(
+          CommonOperatorUtils.CURRENT_DEVICE_INDEX_STRING, 
Integer.toString(currentDeviceIndex));
     }
   }
 
-  protected abstract boolean hasCurrentDeviceEntry();
-
-  protected abstract DeviceEntry getCurrentDeviceEntry();
-
-  protected abstract boolean advanceDeviceEntry();
-
-  protected abstract void recordCurrentDeviceIndex();
-
   protected void constructAlignedSeriesScanUtil() {
-    if (!hasCurrentDeviceEntry()) {
+    if (this.deviceEntries.isEmpty() || currentDeviceIndex >= deviceCount) {
       // no need to construct SeriesScanUtil, hasNext will return false
       return;
     }
 
-    if (getCurrentDeviceEntry() == null) {
-      throw new IllegalStateException("Current device entry in 
TableScanOperator is empty");
+    if (this.deviceEntries.get(this.currentDeviceIndex) == null) {
+      throw new IllegalStateException(
+          "Device entries of index " + this.currentDeviceIndex + " in 
TableScanOperator is empty");
     }
 
-    DeviceEntry deviceEntry = getCurrentDeviceEntry();
+    DeviceEntry deviceEntry = this.deviceEntries.get(this.currentDeviceIndex);
     AlignedFullPath alignedPath =
         constructAlignedPath(deviceEntry, measurementColumnNames, 
measurementSchemas, allSensors);
     this.seriesScanUtil =
@@ -291,7 +302,8 @@ public abstract class AbstractTableScanOperator extends 
AbstractSeriesScanOperat
         + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil)
         + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
         + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId)
-        + (resultTsBlockBuilder == null ? 0 : 
resultTsBlockBuilder.getRetainedSizeInBytes());
+        + (resultTsBlockBuilder == null ? 0 : 
resultTsBlockBuilder.getRetainedSizeInBytes())
+        + RamUsageEstimator.sizeOfCollection(deviceEntries);
   }
 
   public static class AbstractTableScanOperatorParameter {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java
index 3b4b2704103..f6e4a84fc75 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java
@@ -22,11 +22,12 @@ package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational;
 import org.apache.iotdb.commons.path.AlignedFullPath;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanUtil;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryDataSource;
 import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource;
-import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceOffset;
-import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.MultiWayMergeReader;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceTaskRunReader;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
 import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata;
@@ -34,8 +35,6 @@ import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.RamUsageEstimator;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
 
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
 
@@ -43,39 +42,19 @@ public class ExternalTsFileAggTableScanOperator extends 
DefaultAggTableScanOpera
   private static final long INSTANCE_SIZE =
       
RamUsageEstimator.shallowSizeOfInstance(ExternalTsFileAggTableScanOperator.class);
 
-  private final String tableName;
-  private final ExternalTsFileQueryResource externalTsFileQueryResource;
   private final int deviceTaskPartitionIndex;
-  private MultiWayMergeReader deviceTaskReader;
-  private int loadedDeviceOffsetIndex = -1;
-  private List<DeviceOffset> currentDeviceOffsets = Collections.emptyList();
+  private DeviceTaskRunReader deviceTaskReader;
 
   public ExternalTsFileAggTableScanOperator(
-      AbstractAggTableScanOperatorParameter parameter,
-      String tableName,
-      ExternalTsFileQueryResource externalTsFileQueryResource,
-      int deviceTaskPartitionIndex) {
+      AbstractAggTableScanOperatorParameter parameter, int 
deviceTaskPartitionIndex) {
     super(parameter);
-    this.tableName = tableName;
-    this.externalTsFileQueryResource = externalTsFileQueryResource;
     this.deviceTaskPartitionIndex = deviceTaskPartitionIndex;
   }
 
-  @Override
-  String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) {
-    int segmentOffset =
-        deviceEntry.getDeviceID().segmentNum() > 0
-                && tableName.equalsIgnoreCase((String) 
deviceEntry.getNthSegment(0))
-            ? 1
-            : 0;
-    Object segment = deviceEntry.getNthSegment(idColumnIndex + segmentOffset);
-    return segment == null ? null : (String) segment;
-  }
-
   @Override
   protected void constructAlignedSeriesScanUtil() {
     DeviceEntry deviceEntry =
-        deviceEntries.isEmpty() || deviceEntries.get(currentDeviceIndex) == 
null
+        !hasCurrentRealDeviceEntry()
             ? new AlignedDeviceEntry(SeriesScanUtil.EMPTY_DEVICE_ID, new 
Binary[0])
             : deviceEntries.get(currentDeviceIndex);
     this.seriesScanUtil =
@@ -92,39 +71,75 @@ public class ExternalTsFileAggTableScanOperator extends 
DefaultAggTableScanOpera
 
   private AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata(
       TsFileResource resource, AlignedFullPath alignedPath) throws IOException 
{
-    if (deviceEntries.isEmpty() || currentDeviceIndex >= deviceEntries.size()) 
{
+    if (!hasCurrentRealDeviceEntry()) {
       return null;
     }
     return ExternalTsFileSeriesScanUtil.loadTimeSeriesMetadata(
         resource,
         alignedPath,
         deviceEntries.get(currentDeviceIndex).getDeviceID(),
-        getCurrentDeviceOffsets(),
-        externalTsFileQueryResource.getTsFilePaths(),
+        deviceTaskReader.getCurrentDeviceOffsetMap().get(resource),
         ((OperatorContext) operatorContext).getInstanceContext(),
         seriesScanOptions.getGlobalTimeFilter());
   }
 
-  private List<DeviceOffset> getCurrentDeviceOffsets() throws IOException {
-    if (loadedDeviceOffsetIndex == currentDeviceIndex) {
-      return currentDeviceOffsets;
-    }
-    if (deviceTaskReader == null) {
+  @Override
+  public void initQueryDataSource(IQueryDataSource dataSource) {
+    ExternalTsFileQueryResource externalTsFileQueryResource =
+        ((ExternalTsFileQueryDataSource) 
dataSource).getExternalTsFileQueryResource();
+    if (hasCurrentRealDeviceEntry() && deviceTaskReader == null) {
       deviceTaskReader =
-          
externalTsFileQueryResource.getMultiWayMergeReader(deviceTaskPartitionIndex);
+          
externalTsFileQueryResource.getDeviceTaskRunReader(deviceTaskPartitionIndex);
     }
-    DeviceEntry currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
-    while (deviceTaskReader.hasNextDevice()) {
-      DeviceEntry deviceEntry = deviceTaskReader.nextDevice();
-      if (deviceEntry.getDeviceID().equals(currentDeviceEntry.getDeviceID())) {
-        currentDeviceOffsets = deviceTaskReader.getCurrentDeviceOffsets();
-        loadedDeviceOffsetIndex = currentDeviceIndex;
-        return currentDeviceOffsets;
+    super.initQueryDataSource(
+        hasCurrentRealDeviceEntry() ? updateCurrentDeviceQueryDataSource() : 
dataSource);
+  }
+
+  @Override
+  protected void moveToNextDevice() throws Exception {
+    nextDevice();
+    inputTsBlock = null;
+
+    if (currentDeviceIndex < deviceCount) {
+      constructAlignedSeriesScanUtil();
+      queryDataSource =
+          hasCurrentRealDeviceEntry() ? updateCurrentDeviceQueryDataSource() : 
queryDataSource;
+      seriesScanUtil.initQueryDataSource(queryDataSource);
+    }
+
+    if (currentDeviceIndex >= deviceCount) {
+      timeIterator.setFinished();
+    }
+  }
+
+  private boolean hasCurrentRealDeviceEntry() {
+    return !deviceEntries.isEmpty()
+        && currentDeviceIndex < deviceEntries.size()
+        && deviceEntries.get(currentDeviceIndex) != null;
+  }
+
+  private ExternalTsFileQueryDataSource updateCurrentDeviceQueryDataSource() {
+    try {
+      if (!deviceTaskReader.nextDevice()) {
+        throw new IllegalStateException(
+            "Unexpected end of external TsFile device task reader at device 
index "
+                + currentDeviceIndex);
+      }
+      DeviceEntry expectedDeviceEntry = deviceEntries.get(currentDeviceIndex);
+      DeviceEntry currentDeviceEntry = deviceTaskReader.getCurrentDevice();
+      if 
(!expectedDeviceEntry.getDeviceID().equals(currentDeviceEntry.getDeviceID())) {
+        throw new IllegalStateException(
+            String.format(
+                "External TsFile device task reader is not aligned with device 
entries at index %d:"
+                    + " expected %s but got %s",
+                currentDeviceIndex,
+                expectedDeviceEntry.getDeviceID(),
+                currentDeviceEntry.getDeviceID()));
       }
+      return deviceTaskReader.getCurrentDeviceQueryDataSource();
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to update external TsFile device 
resources", e);
     }
-    currentDeviceOffsets = Collections.emptyList();
-    loadedDeviceOffsetIndex = currentDeviceIndex;
-    return currentDeviceOffsets;
   }
 
   @Override
@@ -138,9 +153,6 @@ public class ExternalTsFileAggTableScanOperator extends 
DefaultAggTableScanOpera
 
   @Override
   public long ramBytesUsed() {
-    return super.ramBytesUsed()
-        + INSTANCE_SIZE
-        - AbstractDefaultAggTableScanOperator.INSTANCE_SIZE
-        + RamUsageEstimator.sizeOfCollection(currentDeviceOffsets);
+    return super.ramBytesUsed() + INSTANCE_SIZE - 
AbstractDefaultAggTableScanOperator.INSTANCE_SIZE;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
index 6ad6aa2af3f..d4d327be703 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
@@ -86,20 +86,15 @@ public class ExternalTsFileSeriesScanUtil extends 
AlignedSeriesScanUtil {
       TsFileResource resource,
       AlignedFullPath alignedPath,
       IDeviceID currentDeviceID,
-      List<DeviceOffset> currentDeviceOffsets,
-      List<String> tsFilePaths,
+      DeviceOffset currentDeviceOffset,
       FragmentInstanceContext context,
       Filter globalTimeFilter)
       throws IOException {
-    if (currentDeviceOffsets == null || 
!currentDeviceID.equals(alignedPath.getDeviceId())) {
+    if (currentDeviceOffset == null || 
!currentDeviceID.equals(alignedPath.getDeviceId())) {
       return null;
     }
 
-    long[] deviceMeasurementNodeOffset =
-        getDeviceMeasurementNodeOffset(currentDeviceOffsets, tsFilePaths, 
resource.getTsFilePath());
-    if (deviceMeasurementNodeOffset == null) {
-      return null;
-    }
+    long[] deviceMeasurementNodeOffset = 
currentDeviceOffset.getMeasurementNodeOffset();
     // TODO: Use deviceMeasurementNodeOffset after FileLoaderUtils supports 
offset-based metadata
     // loading in this branch.
     return FileLoaderUtils.loadAlignedTimeSeriesMetadata(
@@ -111,16 +106,6 @@ public class ExternalTsFileSeriesScanUtil extends 
AlignedSeriesScanUtil {
         context.isIgnoreAllNullRows());
   }
 
-  private static long[] getDeviceMeasurementNodeOffset(
-      List<DeviceOffset> currentDeviceOffsets, List<String> tsFilePaths, 
String tsFilePath) {
-    for (DeviceOffset offset : currentDeviceOffsets) {
-      if (tsFilePath.equals(tsFilePaths.get(offset.getFileIndex()))) {
-        return offset.getMeasurementNodeOffset();
-      }
-    }
-    return null;
-  }
-
   @FunctionalInterface
   public interface ExternalTsFileMetadataLoader {
     AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java
index ad14540e291..73a9cb11ad5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java
@@ -19,63 +19,41 @@
 
 package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
 
+import org.apache.iotdb.calc.plan.planner.CommonOperatorUtils;
 import org.apache.iotdb.commons.path.AlignedFullPath;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryDataSource;
 import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource;
-import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceOffset;
-import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.MultiWayMergeReader;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceTaskRunReader;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
 import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata;
 import org.apache.tsfile.utils.RamUsageEstimator;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
 
 public class ExternalTsFileTableScanOperator extends TableScanOperator {
   private static final long INSTANCE_SIZE =
       
RamUsageEstimator.shallowSizeOfInstance(ExternalTsFileTableScanOperator.class);
-  private static final long ABSTRACT_DEVICE_TABLE_SCAN_OPERATOR_INSTANCE_SIZE =
-      
RamUsageEstimator.shallowSizeOfInstance(AbstractDeviceTableScanOperator.class);
 
-  private final String tableName;
-  private final ExternalTsFileQueryResource externalTsFileQueryResource;
   private final int deviceTaskPartitionIndex;
-  private MultiWayMergeReader deviceTaskReader;
-  private int loadedDeviceOffsetIndex = -1;
-  private List<DeviceOffset> currentDeviceOffsets = Collections.emptyList();
+  private DeviceTaskRunReader deviceTaskReader;
 
   public ExternalTsFileTableScanOperator(
-      AbstractTableScanOperatorParameter parameter,
-      String tableName,
-      ExternalTsFileQueryResource externalTsFileQueryResource,
-      int deviceTaskPartitionIndex) {
+      AbstractTableScanOperatorParameter parameter, int 
deviceTaskPartitionIndex) {
     super(parameter);
-    this.tableName = tableName;
-    this.externalTsFileQueryResource = externalTsFileQueryResource;
     this.deviceTaskPartitionIndex = deviceTaskPartitionIndex;
   }
 
-  @Override
-  String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) {
-    int segmentOffset =
-        deviceEntry.getDeviceID().segmentNum() > 0
-                && tableName.equalsIgnoreCase((String) 
deviceEntry.getNthSegment(0))
-            ? 1
-            : 0;
-    Object segment = deviceEntry.getNthSegment(idColumnIndex + segmentOffset);
-    return segment == null ? null : (String) segment;
-  }
-
   @Override
   protected void constructAlignedSeriesScanUtil() {
-    if (!hasCurrentDeviceEntry()) {
+    if (currentDeviceIndex >= deviceCount) {
       return;
     }
 
-    DeviceEntry deviceEntry = getCurrentDeviceEntry();
+    DeviceEntry deviceEntry = deviceEntries.get(currentDeviceIndex);
     if (deviceEntry == null) {
       throw new IllegalStateException("Current device entry in 
TableScanOperator is empty");
     }
@@ -97,33 +75,58 @@ public class ExternalTsFileTableScanOperator extends 
TableScanOperator {
     return ExternalTsFileSeriesScanUtil.loadTimeSeriesMetadata(
         resource,
         alignedPath,
-        getCurrentDeviceEntry().getDeviceID(),
-        getCurrentDeviceOffsets(),
-        externalTsFileQueryResource.getTsFilePaths(),
+        deviceEntries.get(currentDeviceIndex).getDeviceID(),
+        deviceTaskReader.getCurrentDeviceOffsetMap().get(resource),
         ((OperatorContext) operatorContext).getInstanceContext(),
         seriesScanOptions.getGlobalTimeFilter());
   }
 
-  private List<DeviceOffset> getCurrentDeviceOffsets() throws IOException {
-    if (loadedDeviceOffsetIndex == currentDeviceIndex) {
-      return currentDeviceOffsets;
-    }
+  @Override
+  public void initQueryDataSource(IQueryDataSource dataSource) {
+    ExternalTsFileQueryResource externalTsFileQueryResource =
+        ((ExternalTsFileQueryDataSource) 
dataSource).getExternalTsFileQueryResource();
     if (deviceTaskReader == null) {
       deviceTaskReader =
-          
externalTsFileQueryResource.getMultiWayMergeReader(deviceTaskPartitionIndex);
+          
externalTsFileQueryResource.getDeviceTaskRunReader(deviceTaskPartitionIndex);
+    }
+    IQueryDataSource currentDataSource =
+        currentDeviceIndex < deviceCount ? 
updateCurrentDeviceQueryDataSource() : dataSource;
+    super.initQueryDataSource(currentDataSource);
+  }
+
+  @Override
+  protected void moveToNextDevice() {
+    currentDeviceIndex++;
+    if (currentDeviceIndex < deviceCount) {
+      constructAlignedSeriesScanUtil();
+      seriesScanUtil.initQueryDataSource(updateCurrentDeviceQueryDataSource());
+      this.operatorContext.recordSpecifiedInfo(
+          CommonOperatorUtils.CURRENT_DEVICE_INDEX_STRING, 
Integer.toString(currentDeviceIndex));
     }
-    DeviceEntry currentDeviceEntry = getCurrentDeviceEntry();
-    while (deviceTaskReader.hasNextDevice()) {
-      DeviceEntry deviceEntry = deviceTaskReader.nextDevice();
-      if (deviceEntry.getDeviceID().equals(currentDeviceEntry.getDeviceID())) {
-        currentDeviceOffsets = deviceTaskReader.getCurrentDeviceOffsets();
-        loadedDeviceOffsetIndex = currentDeviceIndex;
-        return currentDeviceOffsets;
+  }
+
+  private ExternalTsFileQueryDataSource updateCurrentDeviceQueryDataSource() {
+    try {
+      if (!deviceTaskReader.nextDevice()) {
+        throw new IllegalStateException(
+            "Unexpected end of external TsFile device task reader at device 
index "
+                + currentDeviceIndex);
+      }
+      DeviceEntry expectedDeviceEntry = deviceEntries.get(currentDeviceIndex);
+      DeviceEntry currentDeviceEntry = deviceTaskReader.getCurrentDevice();
+      if 
(!expectedDeviceEntry.getDeviceID().equals(currentDeviceEntry.getDeviceID())) {
+        throw new IllegalStateException(
+            String.format(
+                "External TsFile device task reader is not aligned with device 
entries at index %d:"
+                    + " expected %s but got %s",
+                currentDeviceIndex,
+                expectedDeviceEntry.getDeviceID(),
+                currentDeviceEntry.getDeviceID()));
       }
+      return deviceTaskReader.getCurrentDeviceQueryDataSource();
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to update external TsFile device 
resources", e);
     }
-    currentDeviceOffsets = Collections.emptyList();
-    loadedDeviceOffsetIndex = currentDeviceIndex;
-    return currentDeviceOffsets;
   }
 
   @Override
@@ -137,9 +140,6 @@ public class ExternalTsFileTableScanOperator extends 
TableScanOperator {
 
   @Override
   public long ramBytesUsed() {
-    return super.ramBytesUsed()
-        + INSTANCE_SIZE
-        - ABSTRACT_DEVICE_TABLE_SCAN_OPERATOR_INSTANCE_SIZE
-        + RamUsageEstimator.sizeOfCollection(currentDeviceOffsets);
+    return super.ramBytesUsed() + INSTANCE_SIZE - 
AbstractTableScanOperator.INSTANCE_SIZE;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
index a50e978b377..11f10ef3dae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
@@ -21,7 +21,7 @@ package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational;
 
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 
-public class TableScanOperator extends AbstractDeviceTableScanOperator {
+public class TableScanOperator extends AbstractTableScanOperator {
   public TableScanOperator(AbstractTableScanOperatorParameter parameter) {
     super(parameter);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewScanOperator.java
index 12feaccc42f..03d0e998e71 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewScanOperator.java
@@ -23,7 +23,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 
 import org.apache.tsfile.file.metadata.IDeviceID;
 
-public class TreeAlignedDeviceViewScanOperator extends 
AbstractDeviceTableScanOperator {
+public class TreeAlignedDeviceViewScanOperator extends 
AbstractTableScanOperator {
 
   private final IDeviceID.TreeDeviceIdColumnValueExtractor extractor;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
index b0fc9e1d554..1bc40a2896b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
@@ -1128,6 +1128,13 @@ public class DataNodeTableOperatorGenerator
   @Override
   public Operator visitExternalTsFileScan(
       ExternalTsFileScanNode node, LocalExecutionPlanContext context) {
+    if (node.getDeviceEntries().isEmpty()) {
+      OperatorContext operatorContext =
+          addOperatorContext(
+              context, node.getPlanNodeId(), 
EmptyDataOperator.class.getSimpleName());
+      return new EmptyDataOperator(operatorContext);
+    }
+
     AbstractTableScanOperator.AbstractTableScanOperatorParameter parameter =
         constructAbstractTableScanOperatorParameter(
             node,
@@ -1137,11 +1144,7 @@ public class DataNodeTableOperatorGenerator
             Long.MAX_VALUE);
 
     AbstractTableScanOperator externalTsFileTableScanOperator =
-        new ExternalTsFileTableScanOperator(
-            parameter,
-            node.getQualifiedObjectName().getObjectName(),
-            node.getExternalTsFileQueryResource(),
-            node.getDeviceTaskPartitionIndex());
+        new ExternalTsFileTableScanOperator(parameter, 
node.getDeviceTaskPartitionIndex());
 
     
context.getInstanceContext().collectTable(node.getQualifiedObjectName().getObjectName());
 
@@ -1149,7 +1152,9 @@ public class DataNodeTableOperatorGenerator
     dataDriverContext.addSourceOperator(externalTsFileTableScanOperator);
     
dataDriverContext.setQueryDataSourceType(QueryDataSourceType.EXTERNAL_TSFILE_SCAN);
     dataDriverContext.setInputDriver(true);
-    context.getInstanceContext().addExternalTsFilePaths(node.getTsFilePaths());
+    context
+        .getInstanceContext()
+        .addExternalTsFileQueryResource(node.getExternalTsFileQueryResource());
 
     return externalTsFileTableScanOperator;
   }
@@ -1640,11 +1645,7 @@ public class DataNodeTableOperatorGenerator
         constructAbstractAggTableScanOperatorParameter(node, context);
 
     ExternalTsFileAggTableScanOperator aggTableScanOperator =
-        new ExternalTsFileAggTableScanOperator(
-            parameter,
-            node.getQualifiedObjectName().getObjectName(),
-            node.getExternalTsFileQueryResource(),
-            node.getDeviceTaskPartitionIndex());
+        new ExternalTsFileAggTableScanOperator(parameter, 
node.getDeviceTaskPartitionIndex());
 
     
context.getInstanceContext().collectTable(node.getQualifiedObjectName().getObjectName());
     addSource(
@@ -1658,7 +1659,9 @@ public class DataNodeTableOperatorGenerator
 
     DataDriverContext dataDriverContext = (DataDriverContext) 
context.getDriverContext();
     
dataDriverContext.setQueryDataSourceType(QueryDataSourceType.EXTERNAL_TSFILE_SCAN);
-    context.getInstanceContext().addExternalTsFilePaths(node.getTsFilePaths());
+    context
+        .getInstanceContext()
+        .addExternalTsFileQueryResource(node.getExternalTsFileQueryResource());
 
     return aggTableScanOperator;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryDataSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryDataSource.java
new file mode 100644
index 00000000000..ac55a8cbfe2
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryDataSource.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile;
+
+import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.filter.basic.Filter;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ExternalTsFileQueryDataSource extends QueryDataSource {
+
+  private final ExternalTsFileQueryResource externalTsFileQueryResource;
+
+  public ExternalTsFileQueryDataSource(ExternalTsFileQueryResource 
externalTsFileQueryResource) {
+    this(externalTsFileQueryResource, Collections.emptyList());
+  }
+
+  ExternalTsFileQueryDataSource(
+      ExternalTsFileQueryResource externalTsFileQueryResource,
+      List<TsFileResource> unseqResources) {
+    super(Collections.emptyList(), unseqResources);
+    this.externalTsFileQueryResource = externalTsFileQueryResource;
+  }
+
+  @Override
+  public IQueryDataSource clone() {
+    return new ExternalTsFileQueryDataSource(externalTsFileQueryResource, 
getUnseqResources());
+  }
+
+  public ExternalTsFileQueryResource getExternalTsFileQueryResource() {
+    return externalTsFileQueryResource;
+  }
+
+  @Override
+  public boolean isSeqSatisfied(
+      IDeviceID deviceID, int curIndex, Filter timeFilter, boolean debug) {
+    return true;
+  }
+
+  @Override
+  public boolean isUnSeqSatisfied(
+      IDeviceID deviceID, int curIndex, Filter timeFilter, boolean debug) {
+    return true;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
index 8371c6a1687..bd45d94e188 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
@@ -26,6 +26,9 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.Exte
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
 
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.read.LazyTsFileDeviceIterator;
@@ -38,7 +41,7 @@ import java.io.BufferedOutputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.EOFException;
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -64,19 +67,20 @@ import static java.util.Objects.requireNonNull;
 public class ExternalTsFileQueryResource implements AutoCloseable {
 
   public static final String EXTERNAL_TSFILE_TMP_DIR = "external-tsfile";
-  private static final long DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES = 8L * 
1024 * 1024;
+  private static final long DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES = 1;
+  //  private static final long DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES = 8L * 
1024 * 1024;
   private static final long DEVICE_OFFSET_INSTANCE_SIZE_IN_BYTES = 32L;
 
   private final QueryId queryId;
   private final Path queryTempRoot;
   private final String tableName;
   private final List<String> tsFilePaths;
+  private final List<TsFileResource> tsFileResources;
   private final LongConsumer ioSizeRecorder;
   private final List<DeviceEntry> deviceEntries = new ArrayList<>();
   private List<DeviceTaskPartition> deviceTaskPartitions = 
Collections.emptyList();
   private Comparator<DeviceEntry> deviceEntryComparator;
 
-  private boolean readersRetained;
   private boolean closed;
 
   public ExternalTsFileQueryResource(
@@ -95,16 +99,19 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
     this.tableName = tableName;
     this.tsFilePaths =
         Collections.unmodifiableList(new 
ArrayList<>(requireNonNull(tsFilePaths, "tsFilePaths")));
+    this.tsFileResources = createTsFileResources(this.tsFilePaths);
     this.ioSizeRecorder = requireNonNull(ioSizeRecorder, "ioSizeRecorder is 
null");
+    for (String tsFilePath : tsFilePaths) {
+      
FileReaderManager.getInstance().increaseExternalFileReaderReference(tsFilePath);
+    }
   }
 
-  public synchronized void collectDeviceEntries(
+  public void collectDeviceEntries(
       SchemaFilter schemaFilter, Comparator<DeviceEntry> comparator, int 
partitionCount) {
     checkNotClosed();
-    retainFileReaderReferences();
     ExternalTsFileDeviceFilterVisitor deviceFilterVisitor = new 
ExternalTsFileDeviceFilterVisitor();
     try (DeviceCollector deviceCollector = new DeviceCollector()) {
-      List<DeviceTaskPartition> partitions = 
createDeviceTaskPartitions(partitionCount);
+      createDeviceTaskPartitions(partitionCount);
       while (deviceCollector.hasNextDevice()) {
         IDeviceID deviceID = deviceCollector.nextDevice();
         if (schemaFilter != null
@@ -118,22 +125,23 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
             new DeviceTask(
                 deviceEntryIndex, new 
ArrayList<>(deviceCollector.getCurrentDeviceOffsets()));
         DeviceTaskPartition partition =
-            partitions.get(Math.floorMod(deviceID.hashCode(), 
partitions.size()));
+            deviceTaskPartitions.get(
+                Math.floorMod(deviceID.hashCode(), 
deviceTaskPartitions.size()));
         partition.add(deviceTask);
         if (partition.getEstimatedSizeInBytes() >= 
DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES) {
           partition.flush(comparator);
         }
       }
       deviceEntryComparator = comparator;
-      collectDeviceTaskPartitions(partitions, comparator);
+      collectDeviceTaskPartitions(comparator);
     }
   }
 
-  public synchronized MultiWayMergeReader getMultiWayMergeReader(int 
partitionIndex) {
+  public DeviceTaskRunReader getDeviceTaskRunReader(int partitionIndex) {
     checkNotClosed();
     DeviceTaskPartition partition = getDeviceTaskPartition(partitionIndex);
     try {
-      return new DeviceTaskRunReader(partition.getRunFiles(), deviceEntries, 
deviceEntryComparator);
+      return new DeviceTaskRunReader(partition);
     } catch (IOException e) {
       throw new RuntimeException("Failed to create external TsFile device task 
run reader", e);
     }
@@ -143,6 +151,10 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
     return tsFilePaths;
   }
 
+  public List<TsFileResource> getTsFileResources() {
+    return tsFileResources;
+  }
+
   public List<DeviceEntry> getDeviceEntries() {
     return deviceEntries;
   }
@@ -162,7 +174,7 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
   }
 
   @Override
-  public synchronized void close() {
+  public void close() {
     if (closed) {
       return;
     }
@@ -175,24 +187,10 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
     }
   }
 
-  private void retainFileReaderReferences() {
-    if (readersRetained) {
-      return;
-    }
-    for (String tsFilePath : tsFilePaths) {
-      
FileReaderManager.getInstance().increaseExternalFileReaderReference(tsFilePath);
-    }
-    readersRetained = true;
-  }
-
   private void releaseFileReaderReferences() {
-    if (!readersRetained) {
-      return;
-    }
     for (String tsFilePath : tsFilePaths) {
       
FileReaderManager.getInstance().decreaseExternalFileReaderReference(tsFilePath);
     }
-    readersRetained = false;
   }
 
   private void checkNotClosed() {
@@ -201,15 +199,28 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
     }
   }
 
-  public interface MultiWayMergeReader extends AutoCloseable {
-    boolean hasNextDevice() throws IOException;
-
-    DeviceEntry nextDevice() throws IOException;
-
-    List<DeviceOffset> getCurrentDeviceOffsets();
-
-    @Override
-    void close() throws IOException;
+  private static List<TsFileResource> createTsFileResources(List<String> 
tsFilePaths) {
+    List<TsFileResource> tsFileResources = new ArrayList<>(tsFilePaths.size());
+    for (String tsFilePath : tsFilePaths) {
+      TsFileResource resource =
+          new TsFileResource(new File(tsFilePath), 
TsFileResourceStatus.NORMAL);
+      if (resource.resourceFileExists()) {
+        try {
+          resource.deserialize();
+        } catch (IOException e) {
+          throw new RuntimeException(
+              "Failed to deserialize external TsFile resource: "
+                  + tsFilePath
+                  + ", "
+                  + e.getMessage(),
+              e);
+        }
+      } else {
+        resource.setTimeIndex(new FileTimeIndex(Long.MIN_VALUE, 
Long.MAX_VALUE));
+      }
+      tsFileResources.add(resource);
+    }
+    return Collections.unmodifiableList(tsFileResources);
   }
 
   public class DeviceTaskPartition {
@@ -283,6 +294,16 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
       return !deviceEntryIndexes.isEmpty();
     }
 
+    private void finish(Comparator<DeviceEntry> comparator) {
+      if (pendingDeviceTasks.isEmpty()) {
+        return;
+      }
+      sortPendingDeviceTasks(comparator);
+      for (DeviceTask deviceTask : pendingDeviceTasks) {
+        deviceEntryIndexes.add(deviceTask.deviceEntryIndex);
+      }
+    }
+
     private void sortDeviceEntries(Comparator<DeviceEntry> comparator) {
       if (comparator != null) {
         deviceEntryIndexes.sort(
@@ -300,36 +321,34 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
     private List<Path> getRunFiles() {
       return runFiles;
     }
+
+    private List<DeviceTask> getPendingDeviceTasks() {
+      return pendingDeviceTasks;
+    }
   }
 
-  private List<DeviceTaskPartition> createDeviceTaskPartitions(int 
partitionCount) {
+  private void createDeviceTaskPartitions(int partitionCount) {
     if (partitionCount <= 0) {
       throw new IllegalArgumentException(
           "External TsFile device task partition count must be positive");
     }
-    List<DeviceTaskPartition> partitions = new ArrayList<>(partitionCount);
+    deviceTaskPartitions = new ArrayList<>(partitionCount);
     for (int i = 0; i < partitionCount; i++) {
-      partitions.add(new DeviceTaskPartition(i));
+      deviceTaskPartitions.add(new DeviceTaskPartition(i));
     }
-    return partitions;
   }
 
-  private void collectDeviceTaskPartitions(
-      List<DeviceTaskPartition> partitions, Comparator<DeviceEntry> 
comparator) {
-    if (partitions.isEmpty()) {
-      deviceTaskPartitions = Collections.emptyList();
-      return;
-    }
-    List<DeviceTaskPartition> nonEmptyPartitions = new 
ArrayList<>(partitions.size());
-    for (DeviceTaskPartition partition : partitions) {
-      partition.flush(comparator);
+  private void collectDeviceTaskPartitions(Comparator<DeviceEntry> comparator) 
{
+    Iterator<DeviceTaskPartition> iterator = deviceTaskPartitions.iterator();
+    while (iterator.hasNext()) {
+      DeviceTaskPartition partition = iterator.next();
+      partition.finish(comparator);
       if (!partition.hasDeviceTasks()) {
+        iterator.remove();
         continue;
       }
       partition.sortDeviceEntries(comparator);
-      nonEmptyPartitions.add(partition);
     }
-    deviceTaskPartitions = nonEmptyPartitions;
   }
 
   private Path writeDeviceTaskRun(Path runRoot, int runIndex, List<DeviceTask> 
deviceTasks)
@@ -356,58 +375,74 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
     return size;
   }
 
-  private static class DeviceTaskRunReader implements MultiWayMergeReader {
+  public class DeviceTaskRunReader implements AutoCloseable {
 
-    private final List<DeviceEntry> deviceEntries;
     private final PriorityQueue<DeviceTaskRunCursor> runCursors;
-    private DeviceTask nextDeviceTask;
-    private List<DeviceOffset> currentDeviceOffsets = Collections.emptyList();
+    private DeviceEntry currentDevice;
+    private ExternalTsFileQueryDataSource currentDeviceQueryDataSource;
+    private Map<TsFileResource, DeviceOffset> currentDeviceOffsetMap;
 
-    private DeviceTaskRunReader(
-        List<Path> runFiles, List<DeviceEntry> deviceEntries, 
Comparator<DeviceEntry> comparator)
-        throws IOException {
-      this.deviceEntries = deviceEntries;
+    private DeviceTaskRunReader(DeviceTaskPartition partition) throws 
IOException {
       Comparator<DeviceTaskRunCursor> cursorComparator =
           (left, right) ->
-              comparator == null
+              deviceEntryComparator == null
                   ? left.getCurrentDeviceEntry()
                       .getDeviceID()
                       .compareTo(right.getCurrentDeviceEntry().getDeviceID())
-                  : comparator.compare(left.getCurrentDeviceEntry(), 
right.getCurrentDeviceEntry());
+                  : deviceEntryComparator.compare(
+                      left.getCurrentDeviceEntry(), 
right.getCurrentDeviceEntry());
       this.runCursors = new PriorityQueue<>(cursorComparator);
-      for (Path runFile : runFiles) {
-        DeviceTaskRunCursor cursor = new DeviceTaskRunCursor(runFile, 
deviceEntries);
+      for (Path runFile : partition.getRunFiles()) {
+        DeviceTaskRunCursor cursor = new DiskDeviceTaskRunCursor(runFile, 
deviceEntries);
         if (cursor.hasCurrentDeviceTask()) {
           runCursors.add(cursor);
         } else {
           cursor.close();
         }
       }
+      DeviceTaskRunCursor memoryCursor =
+          new MemoryDeviceTaskRunCursor(partition.getPendingDeviceTasks(), 
deviceEntries);
+      if (memoryCursor.hasCurrentDeviceTask()) {
+        runCursors.add(memoryCursor);
+      }
     }
 
-    @Override
-    public boolean hasNextDevice() throws IOException {
-      if (nextDeviceTask != null) {
-        return true;
+    public boolean nextDevice() throws IOException {
+      if (runCursors.isEmpty()) {
+        return false;
+      }
+      DeviceTaskRunCursor cursor = runCursors.poll();
+      DeviceTask result = cursor.getCurrentDeviceTask();
+      cursor.advance();
+      if (cursor.hasCurrentDeviceTask()) {
+        runCursors.add(cursor);
+      } else {
+        cursor.close();
       }
-      nextDeviceTask = readNextDeviceTask();
-      return nextDeviceTask != null;
-    }
 
-    @Override
-    public DeviceEntry nextDevice() throws IOException {
-      if (!hasNextDevice()) {
-        throw new EOFException("No more external TsFile device task");
+      currentDevice = deviceEntries.get(result.deviceEntryIndex);
+      List<TsFileResource> unseqResources = new 
ArrayList<>(result.deviceOffsets.size());
+      currentDeviceOffsetMap = new HashMap<>(result.deviceOffsets.size());
+      for (DeviceOffset deviceOffset : result.deviceOffsets) {
+        TsFileResource tsFileResource = 
tsFileResources.get(deviceOffset.getFileIndex());
+        unseqResources.add(tsFileResource);
+        currentDeviceOffsetMap.put(tsFileResource, deviceOffset);
       }
-      DeviceTask deviceTask = nextDeviceTask;
-      nextDeviceTask = null;
-      currentDeviceOffsets = deviceTask.deviceOffsets;
-      return deviceEntries.get(deviceTask.deviceEntryIndex);
+      currentDeviceQueryDataSource =
+          new ExternalTsFileQueryDataSource(ExternalTsFileQueryResource.this, 
unseqResources);
+      return true;
     }
 
-    @Override
-    public List<DeviceOffset> getCurrentDeviceOffsets() {
-      return currentDeviceOffsets;
+    public DeviceEntry getCurrentDevice() {
+      return currentDevice;
+    }
+
+    public ExternalTsFileQueryDataSource getCurrentDeviceQueryDataSource() {
+      return currentDeviceQueryDataSource;
+    }
+
+    public Map<TsFileResource, DeviceOffset> getCurrentDeviceOffsetMap() {
+      return currentDeviceOffsetMap;
     }
 
     @Override
@@ -428,31 +463,28 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
         throw exception;
       }
     }
+  }
 
-    private DeviceTask readNextDeviceTask() throws IOException {
-      if (runCursors.isEmpty()) {
-        return null;
-      }
-      DeviceTaskRunCursor cursor = runCursors.poll();
-      DeviceTask result = cursor.getCurrentDeviceTask();
-      cursor.advance();
-      if (cursor.hasCurrentDeviceTask()) {
-        runCursors.add(cursor);
-      } else {
-        cursor.close();
-      }
-      return result;
-    }
+  private interface DeviceTaskRunCursor extends Closeable {
+
+    boolean hasCurrentDeviceTask();
+
+    DeviceTask getCurrentDeviceTask();
+
+    DeviceEntry getCurrentDeviceEntry();
+
+    void advance() throws IOException;
   }
 
-  private static class DeviceTaskRunCursor implements Closeable {
+  private static class DiskDeviceTaskRunCursor implements DeviceTaskRunCursor {
 
     private final List<DeviceEntry> deviceEntries;
     private final DataInputStream inputStream;
     private int remainingDeviceTasks;
     private DeviceTask currentDeviceTask;
 
-    private DeviceTaskRunCursor(Path runFile, List<DeviceEntry> deviceEntries) 
throws IOException {
+    private DiskDeviceTaskRunCursor(Path runFile, List<DeviceEntry> 
deviceEntries)
+        throws IOException {
       this.deviceEntries = deviceEntries;
       this.inputStream =
           new DataInputStream(new 
BufferedInputStream(Files.newInputStream(runFile)));
@@ -460,7 +492,8 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
       advance();
     }
 
-    private void advance() throws IOException {
+    @Override
+    public void advance() throws IOException {
       if (remainingDeviceTasks <= 0) {
         currentDeviceTask = null;
         return;
@@ -469,15 +502,18 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
       currentDeviceTask = DeviceTask.deserialize(inputStream);
     }
 
-    private boolean hasCurrentDeviceTask() {
+    @Override
+    public boolean hasCurrentDeviceTask() {
       return currentDeviceTask != null;
     }
 
-    private DeviceTask getCurrentDeviceTask() {
+    @Override
+    public DeviceTask getCurrentDeviceTask() {
       return currentDeviceTask;
     }
 
-    private DeviceEntry getCurrentDeviceEntry() {
+    @Override
+    public DeviceEntry getCurrentDeviceEntry() {
       return deviceEntries.get(currentDeviceTask.deviceEntryIndex);
     }
 
@@ -487,6 +523,50 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
     }
   }
 
+  private static class MemoryDeviceTaskRunCursor implements 
DeviceTaskRunCursor {
+
+    private final List<DeviceTask> deviceTasks;
+    private final List<DeviceEntry> deviceEntries;
+    private int nextIndex;
+    private DeviceTask currentDeviceTask;
+
+    private MemoryDeviceTaskRunCursor(
+        List<DeviceTask> deviceTasks, List<DeviceEntry> deviceEntries) {
+      this.deviceTasks = deviceTasks;
+      this.deviceEntries = deviceEntries;
+      advance();
+    }
+
+    @Override
+    public void advance() {
+      if (nextIndex >= deviceTasks.size()) {
+        currentDeviceTask = null;
+        return;
+      }
+      currentDeviceTask = deviceTasks.get(nextIndex++);
+    }
+
+    @Override
+    public boolean hasCurrentDeviceTask() {
+      return currentDeviceTask != null;
+    }
+
+    @Override
+    public DeviceTask getCurrentDeviceTask() {
+      return currentDeviceTask;
+    }
+
+    @Override
+    public DeviceEntry getCurrentDeviceEntry() {
+      return deviceEntries.get(currentDeviceTask.deviceEntryIndex);
+    }
+
+    @Override
+    public void close() {
+      currentDeviceTask = null;
+    }
+  }
+
   private class DeviceCollector implements Closeable {
 
     private final Map<Integer, LazyTsFileDeviceIterator> deviceIteratorMap = 
new HashMap<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 0b54b2395a7..92d30d4f649 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -799,13 +799,23 @@ public class TableDistributedPlanGenerator
     if (partitions.isEmpty()) {
       return Collections.singletonList(node);
     }
+    boolean needFinalAggregation =
+        node.getStep() == SINGLE && node.getGroupingKeys().isEmpty() && 
partitions.size() > 1;
+    AggregationNode finalAggregation = null;
+    AggregationTableScanNode partialTemplateNode = node;
+    if (needFinalAggregation) {
+      Pair<AggregationNode, AggregationTableScanNode> splitResult =
+          split(node, symbolAllocator, queryId);
+      finalAggregation = splitResult.left;
+      partialTemplateNode = splitResult.right;
+    }
     List<PlanNode> result = new ArrayList<>(partitions.size());
     for (DeviceTaskPartition partition : partitions) {
       ExternalTsFileAggregationScanNode splitNode =
           new ExternalTsFileAggregationScanNode(
               queryId.genPlanNodeId(),
               node.getQualifiedObjectName(),
-              node.getOutputSymbols(),
+              partialTemplateNode.getOutputSymbols(),
               node.getAssignments(),
               node.getTagAndAttributeIndexMap(),
               node.getScanOrder(),
@@ -816,11 +826,11 @@ public class TableDistributedPlanGenerator
               node.isPushLimitToEachDevice(),
               node.containsNonAlignedDevice(),
               node.getProjection(),
-              node.getAggregations(),
-              node.getGroupingSets(),
-              node.getPreGroupedSymbols(),
-              node.getStep(),
-              node.getGroupIdSymbol(),
+              partialTemplateNode.getAggregations(),
+              partialTemplateNode.getGroupingSets(),
+              partialTemplateNode.getPreGroupedSymbols(),
+              partialTemplateNode.getStep(),
+              partialTemplateNode.getGroupIdSymbol(),
               node.getExternalTsFileQueryResource(),
               partition.getDeviceEntryIndexes(),
               partition.getPartitionIndex(),
@@ -830,6 +840,11 @@ public class TableDistributedPlanGenerator
     }
     sortPropertyContext.ifPresent(
         propertyContext -> applySortProperty(node, result, propertyContext, 
false));
+    if (needFinalAggregation) {
+      OrderingScheme childOrdering = 
nodeOrderingMap.get(result.get(0).getPlanNodeId());
+      
finalAggregation.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, 
result));
+      return Collections.singletonList(finalAggregation);
+    }
     return result;
   }
 

Reply via email to