This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch read_tsfile_table_function
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9cf449d0eefd4743b78269da9914b863305b9971
Author: shuwenwei <[email protected]>
AuthorDate: Wed Jun 10 09:54:49 2026 +0800

    add external tsfile resource
---
 .../db/queryengine/common/MPPQueryContext.java     |  51 ++
 .../fragment/FragmentInstanceContext.java          |   4 -
 .../ExternalTsFileAggTableScanOperator.java        |  59 +-
 .../relational/ExternalTsFileSeriesScanUtil.java   |  15 +-
 .../ExternalTsFileTableScanOperator.java           |  59 +-
 .../queryengine/plan/execution/QueryExecution.java |   1 +
 .../planner/DataNodeTableOperatorGenerator.java    |  10 +-
 .../plan/node/DataNodePlanNodeDeserializer.java    |   3 -
 .../DataNodeTableBuiltinTableFunction.java         |  10 +-
 .../readTsFile/ExternalTsFileQueryResource.java    | 634 +++++++++++++++++++++
 .../{ => readTsFile}/ReadTsFileTableFunction.java  |  30 +-
 .../plan/relational/planner/RelationPlanner.java   |  26 +-
 .../distribute/TableDistributedPlanGenerator.java  | 204 ++++---
 .../iterative/rule/PruneTableScanColumns.java      |   9 +-
 .../planner/node/AggregationTableScanNode.java     |  54 +-
 .../planner/node/DeviceTableScanNode.java          |   5 -
 .../node/ExternalTsFileAggregationScanNode.java    |  93 ++-
 .../planner/node/ExternalTsFileScanNode.java       | 238 ++------
 .../optimizations/PushPredicateIntoTableScan.java  |  48 +-
 .../optimizations/UnaliasSymbolReferences.java     |   9 +-
 20 files changed, 1057 insertions(+), 505 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 1e5bd2bff77..8b1303ef44e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -32,20 +32,27 @@ import 
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Identifier;
 import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Query;
 import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Table;
 import org.apache.iotdb.commons.queryengine.utils.cte.CteDataStore;
+import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
 import 
org.apache.iotdb.db.queryengine.plan.planner.memory.NotThreadSafeMemoryReservationManager;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource;
 import org.apache.iotdb.db.queryengine.statistics.QueryPlanStatistics;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.tsfile.read.filter.basic.Filter;
 import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.nio.file.Paths;
 import java.time.ZoneId;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -62,6 +69,8 @@ import java.util.function.LongConsumer;
  * info and so on.
  */
 public class MPPQueryContext implements IAuditEntity {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MPPQueryContext.class);
+
   private String sql;
   private final QueryId queryId;
 
@@ -158,6 +167,8 @@ public class MPPQueryContext implements IAuditEntity {
   // Tables in the subquery
   private final Map<NodeRef<Query>, List<Identifier>> subQueryTables = new 
HashMap<>();
 
+  private List<ExternalTsFileQueryResource> externalTsFileQueryResources;
+
   @TestOnly
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
@@ -243,6 +254,46 @@ public class MPPQueryContext implements IAuditEntity {
     return queryId;
   }
 
+  public ExternalTsFileQueryResource createExternalTsFileQueryResource(
+      String tableName, List<String> tsFilePaths) {
+    if (externalTsFileQueryResources == null) {
+      externalTsFileQueryResources = new ArrayList<>();
+    }
+    ExternalTsFileQueryResource externalTsFileQueryResource =
+        new ExternalTsFileQueryResource(
+            queryId,
+            
Paths.get(IoTDBDescriptor.getInstance().getConfig().getSortTmpDir())
+                .resolve(ExternalTsFileQueryResource.EXTERNAL_TSFILE_TMP_DIR)
+                .resolve(queryId.getId())
+                .resolve(String.valueOf(externalTsFileQueryResources.size())),
+            tableName,
+            tsFilePaths,
+            ignored -> {},
+            true);
+    externalTsFileQueryResources.add(externalTsFileQueryResource);
+    return externalTsFileQueryResource;
+  }
+
+  public void releaseExternalTsFileQueryResources() {
+    if (externalTsFileQueryResources == null) {
+      return;
+    }
+    for (ExternalTsFileQueryResource externalTsFileQueryResource : 
externalTsFileQueryResources) {
+      try {
+        externalTsFileQueryResource.close();
+      } catch (Exception e) {
+        LOGGER.warn("Failed to release external TsFile query resource", e);
+      }
+    }
+    FileUtils.deleteFileOrDirectory(
+        Paths.get(IoTDBDescriptor.getInstance().getConfig().getSortTmpDir())
+            .resolve(ExternalTsFileQueryResource.EXTERNAL_TSFILE_TMP_DIR)
+            .resolve(queryId.getId())
+            .toFile(),
+        true);
+    externalTsFileQueryResources = null;
+  }
+
   public long getLocalQueryId() {
     return localQueryId;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 654df2301d8..8ae3e885f5e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -826,7 +826,6 @@ public class FragmentInstanceContext extends QueryContext {
           resource.setTimeIndex(new FileTimeIndex(Long.MIN_VALUE, 
Long.MAX_VALUE));
         }
         externalTsFileResources.add(resource);
-        
FileReaderManager.getInstance().increaseExternalFileReaderReference(externalTsFilePath);
       }
 
       this.sharedQueryDataSource =
@@ -1083,9 +1082,6 @@ public class FragmentInstanceContext extends QueryContext 
{
     }
 
     if (externalTsFileResources != null) {
-      for (TsFileResource tsFile : externalTsFileResources) {
-        
FileReaderManager.getInstance().decreaseExternalFileReaderReference(tsFile.getTsFilePath());
-      }
       externalTsFileResources = null;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java
index 79a101a6a79..3b4b2704103 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java
@@ -20,9 +20,11 @@
 package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
 
 import org.apache.iotdb.commons.path.AlignedFullPath;
-import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanUtil;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceOffset;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.MultiWayMergeReader;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -32,7 +34,7 @@ import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.RamUsageEstimator;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
@@ -42,19 +44,21 @@ public class ExternalTsFileAggTableScanOperator extends 
DefaultAggTableScanOpera
       
RamUsageEstimator.shallowSizeOfInstance(ExternalTsFileAggTableScanOperator.class);
 
   private final String tableName;
-  private final List<List<ExternalTsFileDeviceOffset>> deviceOffsets;
+  private final ExternalTsFileQueryResource externalTsFileQueryResource;
+  private final int deviceTaskPartitionIndex;
+  private MultiWayMergeReader deviceTaskReader;
+  private int loadedDeviceOffsetIndex = -1;
+  private List<DeviceOffset> currentDeviceOffsets = Collections.emptyList();
 
   public ExternalTsFileAggTableScanOperator(
       AbstractAggTableScanOperatorParameter parameter,
       String tableName,
-      List<List<ExternalTsFileDeviceOffset>> deviceOffsets) {
+      ExternalTsFileQueryResource externalTsFileQueryResource,
+      int deviceTaskPartitionIndex) {
     super(parameter);
     this.tableName = tableName;
-    this.deviceOffsets = new ArrayList<>(deviceOffsets);
-    if (deviceCount != this.deviceOffsets.size()) {
-      throw new IllegalArgumentException(
-          "The size of external TsFile device offsets should be equal to 
device entries");
-    }
+    this.externalTsFileQueryResource = externalTsFileQueryResource;
+    this.deviceTaskPartitionIndex = deviceTaskPartitionIndex;
   }
 
   @Override
@@ -91,21 +95,52 @@ public class ExternalTsFileAggTableScanOperator extends 
DefaultAggTableScanOpera
     if (deviceEntries.isEmpty() || currentDeviceIndex >= deviceEntries.size()) 
{
       return null;
     }
-    List<ExternalTsFileDeviceOffset> currentDeviceOffsets = 
deviceOffsets.get(currentDeviceIndex);
     return ExternalTsFileSeriesScanUtil.loadTimeSeriesMetadata(
         resource,
         alignedPath,
         deviceEntries.get(currentDeviceIndex).getDeviceID(),
-        currentDeviceOffsets,
+        getCurrentDeviceOffsets(),
+        externalTsFileQueryResource.getTsFilePaths(),
         ((OperatorContext) operatorContext).getInstanceContext(),
         seriesScanOptions.getGlobalTimeFilter());
   }
 
+  private List<DeviceOffset> getCurrentDeviceOffsets() throws IOException {
+    if (loadedDeviceOffsetIndex == currentDeviceIndex) {
+      return currentDeviceOffsets;
+    }
+    if (deviceTaskReader == null) {
+      deviceTaskReader =
+          
externalTsFileQueryResource.getMultiWayMergeReader(deviceTaskPartitionIndex);
+    }
+    DeviceEntry currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
+    while (deviceTaskReader.hasNextDevice()) {
+      DeviceEntry deviceEntry = deviceTaskReader.nextDevice();
+      if (deviceEntry.getDeviceID().equals(currentDeviceEntry.getDeviceID())) {
+        currentDeviceOffsets = deviceTaskReader.getCurrentDeviceOffsets();
+        loadedDeviceOffsetIndex = currentDeviceIndex;
+        return currentDeviceOffsets;
+      }
+    }
+    currentDeviceOffsets = Collections.emptyList();
+    loadedDeviceOffsetIndex = currentDeviceIndex;
+    return currentDeviceOffsets;
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (deviceTaskReader != null) {
+      deviceTaskReader.close();
+      deviceTaskReader = null;
+    }
+    super.close();
+  }
+
   @Override
   public long ramBytesUsed() {
     return super.ramBytesUsed()
         + INSTANCE_SIZE
         - AbstractDefaultAggTableScanOperator.INSTANCE_SIZE
-        + RamUsageEstimator.sizeOfCollection(deviceOffsets);
+        + RamUsageEstimator.sizeOfCollection(currentDeviceOffsets);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
index f46e3b04346..6ad6aa2af3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
@@ -20,11 +20,11 @@
 package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
 
 import org.apache.iotdb.commons.path.AlignedFullPath;
-import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.FileLoaderUtils;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceOffset;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -86,7 +86,8 @@ public class ExternalTsFileSeriesScanUtil extends 
AlignedSeriesScanUtil {
       TsFileResource resource,
       AlignedFullPath alignedPath,
       IDeviceID currentDeviceID,
-      List<ExternalTsFileDeviceOffset> currentDeviceOffsets,
+      List<DeviceOffset> currentDeviceOffsets,
+      List<String> tsFilePaths,
       FragmentInstanceContext context,
       Filter globalTimeFilter)
       throws IOException {
@@ -95,7 +96,7 @@ public class ExternalTsFileSeriesScanUtil extends 
AlignedSeriesScanUtil {
     }
 
     long[] deviceMeasurementNodeOffset =
-        getDeviceMeasurementNodeOffset(currentDeviceOffsets, 
resource.getTsFilePath());
+        getDeviceMeasurementNodeOffset(currentDeviceOffsets, tsFilePaths, 
resource.getTsFilePath());
     if (deviceMeasurementNodeOffset == null) {
       return null;
     }
@@ -111,10 +112,10 @@ public class ExternalTsFileSeriesScanUtil extends 
AlignedSeriesScanUtil {
   }
 
   private static long[] getDeviceMeasurementNodeOffset(
-      List<ExternalTsFileDeviceOffset> currentDeviceOffsets, String 
tsFilePath) {
-    for (ExternalTsFileDeviceOffset offset : currentDeviceOffsets) {
-      if (tsFilePath.equals(offset.getTsFilePath())) {
-        return offset.getDeviceMeasurementNodeOffset();
+      List<DeviceOffset> currentDeviceOffsets, List<String> tsFilePaths, 
String tsFilePath) {
+    for (DeviceOffset offset : currentDeviceOffsets) {
+      if (tsFilePath.equals(tsFilePaths.get(offset.getFileIndex()))) {
+        return offset.getMeasurementNodeOffset();
       }
     }
     return null;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java
index 2ad90b1d460..ad14540e291 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java
@@ -20,8 +20,10 @@
 package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
 
 import org.apache.iotdb.commons.path.AlignedFullPath;
-import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceOffset;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.MultiWayMergeReader;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
@@ -29,7 +31,7 @@ import 
org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata;
 import org.apache.tsfile.utils.RamUsageEstimator;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 public class ExternalTsFileTableScanOperator extends TableScanOperator {
@@ -39,19 +41,21 @@ public class ExternalTsFileTableScanOperator extends 
TableScanOperator {
       
RamUsageEstimator.shallowSizeOfInstance(AbstractDeviceTableScanOperator.class);
 
   private final String tableName;
-  private final List<List<ExternalTsFileDeviceOffset>> deviceOffsets;
+  private final ExternalTsFileQueryResource externalTsFileQueryResource;
+  private final int deviceTaskPartitionIndex;
+  private MultiWayMergeReader deviceTaskReader;
+  private int loadedDeviceOffsetIndex = -1;
+  private List<DeviceOffset> currentDeviceOffsets = Collections.emptyList();
 
   public ExternalTsFileTableScanOperator(
       AbstractTableScanOperatorParameter parameter,
       String tableName,
-      List<List<ExternalTsFileDeviceOffset>> deviceOffsets) {
+      ExternalTsFileQueryResource externalTsFileQueryResource,
+      int deviceTaskPartitionIndex) {
     super(parameter);
     this.tableName = tableName;
-    this.deviceOffsets = new ArrayList<>(deviceOffsets);
-    if (deviceCount != this.deviceOffsets.size()) {
-      throw new IllegalArgumentException(
-          "The size of external TsFile device offsets should be equal to 
device entries");
-    }
+    this.externalTsFileQueryResource = externalTsFileQueryResource;
+    this.deviceTaskPartitionIndex = deviceTaskPartitionIndex;
   }
 
   @Override
@@ -90,21 +94,52 @@ public class ExternalTsFileTableScanOperator extends 
TableScanOperator {
 
   private AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata(
       TsFileResource resource, AlignedFullPath alignedPath) throws IOException 
{
-    List<ExternalTsFileDeviceOffset> currentDeviceOffsets = 
deviceOffsets.get(currentDeviceIndex);
     return ExternalTsFileSeriesScanUtil.loadTimeSeriesMetadata(
         resource,
         alignedPath,
         getCurrentDeviceEntry().getDeviceID(),
-        currentDeviceOffsets,
+        getCurrentDeviceOffsets(),
+        externalTsFileQueryResource.getTsFilePaths(),
         ((OperatorContext) operatorContext).getInstanceContext(),
         seriesScanOptions.getGlobalTimeFilter());
   }
 
+  private List<DeviceOffset> getCurrentDeviceOffsets() throws IOException {
+    if (loadedDeviceOffsetIndex == currentDeviceIndex) {
+      return currentDeviceOffsets;
+    }
+    if (deviceTaskReader == null) {
+      deviceTaskReader =
+          
externalTsFileQueryResource.getMultiWayMergeReader(deviceTaskPartitionIndex);
+    }
+    DeviceEntry currentDeviceEntry = getCurrentDeviceEntry();
+    while (deviceTaskReader.hasNextDevice()) {
+      DeviceEntry deviceEntry = deviceTaskReader.nextDevice();
+      if (deviceEntry.getDeviceID().equals(currentDeviceEntry.getDeviceID())) {
+        currentDeviceOffsets = deviceTaskReader.getCurrentDeviceOffsets();
+        loadedDeviceOffsetIndex = currentDeviceIndex;
+        return currentDeviceOffsets;
+      }
+    }
+    currentDeviceOffsets = Collections.emptyList();
+    loadedDeviceOffsetIndex = currentDeviceIndex;
+    return currentDeviceOffsets;
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (deviceTaskReader != null) {
+      deviceTaskReader.close();
+      deviceTaskReader = null;
+    }
+    super.close();
+  }
+
   @Override
   public long ramBytesUsed() {
     return super.ramBytesUsed()
         + INSTANCE_SIZE
         - ABSTRACT_DEVICE_TABLE_SCAN_OPERATOR_INSTANCE_SIZE
-        + RamUsageEstimator.sizeOfCollection(deviceOffsets);
+        + RamUsageEstimator.sizeOfCollection(currentDeviceOffsets);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index a5f96e3c6c0..0a921dd2dd8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -392,6 +392,7 @@ public class QueryExecution implements IQueryExecution {
       }
       cleanUpResultHandle();
     }
+    context.releaseExternalTsFileQueryResources();
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
index 85835931f24..b0fc9e1d554 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
@@ -1138,7 +1138,10 @@ public class DataNodeTableOperatorGenerator
 
     AbstractTableScanOperator externalTsFileTableScanOperator =
         new ExternalTsFileTableScanOperator(
-            parameter, node.getQualifiedObjectName().getObjectName(), 
node.getDeviceOffsets());
+            parameter,
+            node.getQualifiedObjectName().getObjectName(),
+            node.getExternalTsFileQueryResource(),
+            node.getDeviceTaskPartitionIndex());
 
     
context.getInstanceContext().collectTable(node.getQualifiedObjectName().getObjectName());
 
@@ -1638,7 +1641,10 @@ public class DataNodeTableOperatorGenerator
 
     ExternalTsFileAggTableScanOperator aggTableScanOperator =
         new ExternalTsFileAggTableScanOperator(
-            parameter, node.getQualifiedObjectName().getObjectName(), 
node.getDeviceOffsets());
+            parameter,
+            node.getQualifiedObjectName().getObjectName(),
+            node.getExternalTsFileQueryResource(),
+            node.getDeviceTaskPartitionIndex());
 
     
context.getInstanceContext().collectTable(node.getQualifiedObjectName().getObjectName());
     addSource(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java
index b79011de3ec..2a9b9b6905c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java
@@ -123,7 +123,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalIn
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode;
-import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableDiskUsageInformationSchemaTableScanNode;
@@ -470,8 +469,6 @@ public class DataNodePlanNodeDeserializer extends 
CommonPlanNodeDeserializer {
         return AlignedAggregationTreeDeviceViewScanNode.deserialize(buffer);
       case 1042:
         return NonAlignedAggregationTreeDeviceViewScanNode.deserialize(buffer);
-      case 1043:
-        return ExternalTsFileScanNode.deserialize(buffer);
       case 2000:
         return RelationalInsertTabletNode.deserialize(buffer);
       case 2001:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/DataNodeTableBuiltinTableFunction.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/DataNodeTableBuiltinTableFunction.java
index 858c2ab3926..8e4d0830d41 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/DataNodeTableBuiltinTableFunction.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/DataNodeTableBuiltinTableFunction.java
@@ -21,11 +21,10 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.function;
 
 import org.apache.iotdb.commons.exception.SemanticException;
 import org.apache.iotdb.commons.i18n.QueryMessages;
-import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.ReadTsFileTableFunction;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ReadTsFileTableFunction;
 import org.apache.iotdb.udf.api.relational.TableFunction;
 
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -43,10 +42,9 @@ public enum DataNodeTableBuiltinTableFunction {
   }
 
   private static final Set<String> BUILT_IN_TABLE_FUNCTION_NAME =
-      new HashSet<>(
-          Arrays.stream(DataNodeTableBuiltinTableFunction.values())
-              .map(DataNodeTableBuiltinTableFunction::getFunctionName)
-              .collect(Collectors.toList()));
+      Arrays.stream(DataNodeTableBuiltinTableFunction.values())
+          .map(DataNodeTableBuiltinTableFunction::getFunctionName)
+          .collect(Collectors.toSet());
 
   public static Set<String> getBuiltInTableFunctionName() {
     return BUILT_IN_TABLE_FUNCTION_NAME;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
new file mode 100644
index 00000000000..8371c6a1687
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile;
+
+import org.apache.iotdb.commons.schema.filter.SchemaFilter;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.ExternalTsFileDeviceFilterVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import 
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.LazyTsFileDeviceIterator;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.function.LongConsumer;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Query-scoped resource for external TsFile scans.
+ *
+ * <p>Reader instances are owned by {@link FileReaderManager}; this class only 
balances reference
+ * increments/decrements for the query's external TsFile paths and deletes the 
query temporary
+ * directory.
+ */
+public class ExternalTsFileQueryResource implements AutoCloseable {
+
+  public static final String EXTERNAL_TSFILE_TMP_DIR = "external-tsfile";
+  private static final long DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES = 8L * 
1024 * 1024;
+  private static final long DEVICE_OFFSET_INSTANCE_SIZE_IN_BYTES = 32L;
+
+  private final QueryId queryId;
+  private final Path queryTempRoot;
+  private final String tableName;
+  private final List<String> tsFilePaths;
+  private final LongConsumer ioSizeRecorder;
+  private final List<DeviceEntry> deviceEntries = new ArrayList<>();
+  private List<DeviceTaskPartition> deviceTaskPartitions = 
Collections.emptyList();
+  private Comparator<DeviceEntry> deviceEntryComparator;
+
+  private boolean readersRetained;
+  private boolean closed;
+
+  public ExternalTsFileQueryResource(
+      QueryId queryId,
+      Path tempRoot,
+      String tableName,
+      List<String> tsFilePaths,
+      LongConsumer ioSizeRecorder,
+      boolean useExactTempRoot) {
+    this.queryId = queryId;
+    this.queryTempRoot =
+        useExactTempRoot
+            ? requireNonNull(tempRoot, "tempRoot is null")
+            : requireNonNull(tempRoot, "tempRoot is null")
+                .resolve(requireNonNull(queryId, "queryId is null").getId());
+    this.tableName = tableName;
+    this.tsFilePaths =
+        Collections.unmodifiableList(new 
ArrayList<>(requireNonNull(tsFilePaths, "tsFilePaths")));
+    this.ioSizeRecorder = requireNonNull(ioSizeRecorder, "ioSizeRecorder is 
null");
+  }
+
+  public synchronized void collectDeviceEntries(
+      SchemaFilter schemaFilter, Comparator<DeviceEntry> comparator, int 
partitionCount) {
+    checkNotClosed();
+    retainFileReaderReferences();
+    ExternalTsFileDeviceFilterVisitor deviceFilterVisitor = new 
ExternalTsFileDeviceFilterVisitor();
+    try (DeviceCollector deviceCollector = new DeviceCollector()) {
+      List<DeviceTaskPartition> partitions = 
createDeviceTaskPartitions(partitionCount);
+      while (deviceCollector.hasNextDevice()) {
+        IDeviceID deviceID = deviceCollector.nextDevice();
+        if (schemaFilter != null
+            && !Boolean.TRUE.equals(schemaFilter.accept(deviceFilterVisitor, 
deviceID))) {
+          continue;
+        }
+        DeviceEntry deviceEntry = new AlignedDeviceEntry(deviceID, new 
Binary[0]);
+        int deviceEntryIndex = deviceEntries.size();
+        deviceEntries.add(deviceEntry);
+        DeviceTask deviceTask =
+            new DeviceTask(
+                deviceEntryIndex, new 
ArrayList<>(deviceCollector.getCurrentDeviceOffsets()));
+        DeviceTaskPartition partition =
+            partitions.get(Math.floorMod(deviceID.hashCode(), 
partitions.size()));
+        partition.add(deviceTask);
+        if (partition.getEstimatedSizeInBytes() >= 
DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES) {
+          partition.flush(comparator);
+        }
+      }
+      deviceEntryComparator = comparator;
+      collectDeviceTaskPartitions(partitions, comparator);
+    }
+  }
+
+  public synchronized MultiWayMergeReader getMultiWayMergeReader(int 
partitionIndex) {
+    checkNotClosed();
+    DeviceTaskPartition partition = getDeviceTaskPartition(partitionIndex);
+    try {
+      return new DeviceTaskRunReader(partition.getRunFiles(), deviceEntries, 
deviceEntryComparator);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to create external TsFile device task 
run reader", e);
+    }
+  }
+
+  public List<String> getTsFilePaths() {
+    return tsFilePaths;
+  }
+
+  public List<DeviceEntry> getDeviceEntries() {
+    return deviceEntries;
+  }
+
+  public List<DeviceTaskPartition> getDeviceTaskPartitions() {
+    return deviceTaskPartitions;
+  }
+
+  private DeviceTaskPartition getDeviceTaskPartition(int partitionIndex) {
+    for (DeviceTaskPartition partition : deviceTaskPartitions) {
+      if (partition.getPartitionIndex() == partitionIndex) {
+        return partition;
+      }
+    }
+    throw new IllegalArgumentException(
+        "Unknown external TsFile device task partition: " + partitionIndex);
+  }
+
+  @Override
+  public synchronized void close() {
+    if (closed) {
+      return;
+    }
+    closed = true;
+
+    releaseFileReaderReferences();
+
+    if (Files.exists(queryTempRoot)) {
+      FileUtils.deleteFileOrDirectory(queryTempRoot.toFile(), true);
+    }
+  }
+
+  private void retainFileReaderReferences() {
+    if (readersRetained) {
+      return;
+    }
+    for (String tsFilePath : tsFilePaths) {
+      
FileReaderManager.getInstance().increaseExternalFileReaderReference(tsFilePath);
+    }
+    readersRetained = true;
+  }
+
+  private void releaseFileReaderReferences() {
+    if (!readersRetained) {
+      return;
+    }
+    for (String tsFilePath : tsFilePaths) {
+      
FileReaderManager.getInstance().decreaseExternalFileReaderReference(tsFilePath);
+    }
+    readersRetained = false;
+  }
+
+  private void checkNotClosed() {
+    if (closed) {
+      throw new IllegalStateException("External TsFile query resource has been 
closed: " + queryId);
+    }
+  }
+
+  public interface MultiWayMergeReader extends AutoCloseable {
+    boolean hasNextDevice() throws IOException;
+
+    DeviceEntry nextDevice() throws IOException;
+
+    List<DeviceOffset> getCurrentDeviceOffsets();
+
+    @Override
+    void close() throws IOException;
+  }
+
+  public class DeviceTaskPartition {
+
+    private final int partitionIndex;
+    private final List<DeviceTask> pendingDeviceTasks = new ArrayList<>();
+    private final List<Integer> deviceEntryIndexes = new ArrayList<>();
+    private final List<Path> runFiles = new ArrayList<>();
+    private long estimatedSizeInBytes;
+
+    private DeviceTaskPartition(int partitionIndex) {
+      this.partitionIndex = partitionIndex;
+    }
+
+    public int getPartitionIndex() {
+      return partitionIndex;
+    }
+
+    public List<Integer> getDeviceEntryIndexes() {
+      return deviceEntryIndexes;
+    }
+
+    private void add(DeviceTask deviceTask) {
+      pendingDeviceTasks.add(deviceTask);
+      estimatedSizeInBytes += estimateDeviceTaskSize(deviceTask);
+    }
+
+    private void flush(Comparator<DeviceEntry> comparator) {
+      if (pendingDeviceTasks.isEmpty()) {
+        return;
+      }
+      sortPendingDeviceTasks(comparator);
+      try {
+        runFiles.add(
+            writeDeviceTaskRun(
+                queryTempRoot.resolve("child-" + partitionIndex),
+                runFiles.size(),
+                pendingDeviceTasks));
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to flush external TsFile device 
task partition", e);
+      }
+      for (DeviceTask deviceTask : pendingDeviceTasks) {
+        deviceEntryIndexes.add(deviceTask.deviceEntryIndex);
+      }
+      pendingDeviceTasks.clear();
+      estimatedSizeInBytes = 0;
+    }
+
+    private void sortPendingDeviceTasks(Comparator<DeviceEntry> comparator) {
+      if (comparator != null) {
+        pendingDeviceTasks.sort(
+            (left, right) ->
+                comparator.compare(
+                    deviceEntries.get(left.deviceEntryIndex),
+                    deviceEntries.get(right.deviceEntryIndex)));
+      } else {
+        pendingDeviceTasks.sort(
+            (left, right) ->
+                deviceEntries
+                    .get(left.deviceEntryIndex)
+                    .getDeviceID()
+                    
.compareTo(deviceEntries.get(right.deviceEntryIndex).getDeviceID()));
+      }
+    }
+
+    private long getEstimatedSizeInBytes() {
+      return estimatedSizeInBytes;
+    }
+
+    private boolean hasDeviceTasks() {
+      return !deviceEntryIndexes.isEmpty();
+    }
+
+    private void sortDeviceEntries(Comparator<DeviceEntry> comparator) {
+      if (comparator != null) {
+        deviceEntryIndexes.sort(
+            (left, right) -> comparator.compare(deviceEntries.get(left), 
deviceEntries.get(right)));
+      } else {
+        deviceEntryIndexes.sort(
+            (left, right) ->
+                deviceEntries
+                    .get(left)
+                    .getDeviceID()
+                    .compareTo(deviceEntries.get(right).getDeviceID()));
+      }
+    }
+
+    private List<Path> getRunFiles() {
+      return runFiles;
+    }
+  }
+
+  private List<DeviceTaskPartition> createDeviceTaskPartitions(int 
partitionCount) {
+    if (partitionCount <= 0) {
+      throw new IllegalArgumentException(
+          "External TsFile device task partition count must be positive");
+    }
+    List<DeviceTaskPartition> partitions = new ArrayList<>(partitionCount);
+    for (int i = 0; i < partitionCount; i++) {
+      partitions.add(new DeviceTaskPartition(i));
+    }
+    return partitions;
+  }
+
+  private void collectDeviceTaskPartitions(
+      List<DeviceTaskPartition> partitions, Comparator<DeviceEntry> 
comparator) {
+    if (partitions.isEmpty()) {
+      deviceTaskPartitions = Collections.emptyList();
+      return;
+    }
+    List<DeviceTaskPartition> nonEmptyPartitions = new 
ArrayList<>(partitions.size());
+    for (DeviceTaskPartition partition : partitions) {
+      partition.flush(comparator);
+      if (!partition.hasDeviceTasks()) {
+        continue;
+      }
+      partition.sortDeviceEntries(comparator);
+      nonEmptyPartitions.add(partition);
+    }
+    deviceTaskPartitions = nonEmptyPartitions;
+  }
+
+  private Path writeDeviceTaskRun(Path runRoot, int runIndex, List<DeviceTask> 
deviceTasks)
+      throws IOException {
+    Files.createDirectories(runRoot);
+    Path runFile = runRoot.resolve("run-" + runIndex + ".bin");
+    try (DataOutputStream outputStream =
+        new DataOutputStream(new 
BufferedOutputStream(Files.newOutputStream(runFile)))) {
+      ReadWriteIOUtils.write(deviceTasks.size(), outputStream);
+      for (DeviceTask deviceTask : deviceTasks) {
+        deviceTask.serialize(outputStream);
+      }
+    }
+    return runFile;
+  }
+
+  private static long estimateDeviceTaskSize(DeviceTask deviceTask) {
+    long size = 64L;
+    for (DeviceOffset offset : deviceTask.deviceOffsets) {
+      size +=
+          DEVICE_OFFSET_INSTANCE_SIZE_IN_BYTES
+              + (long) Long.BYTES * offset.measurementNodeOffset.length;
+    }
+    return size;
+  }
+
+  private static class DeviceTaskRunReader implements MultiWayMergeReader {
+
+    private final List<DeviceEntry> deviceEntries;
+    private final PriorityQueue<DeviceTaskRunCursor> runCursors;
+    private DeviceTask nextDeviceTask;
+    private List<DeviceOffset> currentDeviceOffsets = Collections.emptyList();
+
+    private DeviceTaskRunReader(
+        List<Path> runFiles, List<DeviceEntry> deviceEntries, 
Comparator<DeviceEntry> comparator)
+        throws IOException {
+      this.deviceEntries = deviceEntries;
+      Comparator<DeviceTaskRunCursor> cursorComparator =
+          (left, right) ->
+              comparator == null
+                  ? left.getCurrentDeviceEntry()
+                      .getDeviceID()
+                      .compareTo(right.getCurrentDeviceEntry().getDeviceID())
+                  : comparator.compare(left.getCurrentDeviceEntry(), 
right.getCurrentDeviceEntry());
+      this.runCursors = new PriorityQueue<>(cursorComparator);
+      for (Path runFile : runFiles) {
+        DeviceTaskRunCursor cursor = new DeviceTaskRunCursor(runFile, 
deviceEntries);
+        if (cursor.hasCurrentDeviceTask()) {
+          runCursors.add(cursor);
+        } else {
+          cursor.close();
+        }
+      }
+    }
+
+    @Override
+    public boolean hasNextDevice() throws IOException {
+      if (nextDeviceTask != null) {
+        return true;
+      }
+      nextDeviceTask = readNextDeviceTask();
+      return nextDeviceTask != null;
+    }
+
+    @Override
+    public DeviceEntry nextDevice() throws IOException {
+      if (!hasNextDevice()) {
+        throw new EOFException("No more external TsFile device task");
+      }
+      DeviceTask deviceTask = nextDeviceTask;
+      nextDeviceTask = null;
+      currentDeviceOffsets = deviceTask.deviceOffsets;
+      return deviceEntries.get(deviceTask.deviceEntryIndex);
+    }
+
+    @Override
+    public List<DeviceOffset> getCurrentDeviceOffsets() {
+      return currentDeviceOffsets;
+    }
+
+    @Override
+    public void close() throws IOException {
+      IOException exception = null;
+      while (!runCursors.isEmpty()) {
+        try {
+          runCursors.poll().close();
+        } catch (IOException e) {
+          if (exception == null) {
+            exception = e;
+          } else {
+            exception.addSuppressed(e);
+          }
+        }
+      }
+      if (exception != null) {
+        throw exception;
+      }
+    }
+
+    private DeviceTask readNextDeviceTask() throws IOException {
+      if (runCursors.isEmpty()) {
+        return null;
+      }
+      DeviceTaskRunCursor cursor = runCursors.poll();
+      DeviceTask result = cursor.getCurrentDeviceTask();
+      cursor.advance();
+      if (cursor.hasCurrentDeviceTask()) {
+        runCursors.add(cursor);
+      } else {
+        cursor.close();
+      }
+      return result;
+    }
+  }
+
+  private static class DeviceTaskRunCursor implements Closeable {
+
+    private final List<DeviceEntry> deviceEntries;
+    private final DataInputStream inputStream;
+    private int remainingDeviceTasks;
+    private DeviceTask currentDeviceTask;
+
+    private DeviceTaskRunCursor(Path runFile, List<DeviceEntry> deviceEntries) 
throws IOException {
+      this.deviceEntries = deviceEntries;
+      this.inputStream =
+          new DataInputStream(new 
BufferedInputStream(Files.newInputStream(runFile)));
+      this.remainingDeviceTasks = ReadWriteIOUtils.readInt(inputStream);
+      advance();
+    }
+
+    private void advance() throws IOException {
+      if (remainingDeviceTasks <= 0) {
+        currentDeviceTask = null;
+        return;
+      }
+      remainingDeviceTasks--;
+      currentDeviceTask = DeviceTask.deserialize(inputStream);
+    }
+
+    private boolean hasCurrentDeviceTask() {
+      return currentDeviceTask != null;
+    }
+
+    private DeviceTask getCurrentDeviceTask() {
+      return currentDeviceTask;
+    }
+
+    private DeviceEntry getCurrentDeviceEntry() {
+      return deviceEntries.get(currentDeviceTask.deviceEntryIndex);
+    }
+
+    @Override
+    public void close() throws IOException {
+      inputStream.close();
+    }
+  }
+
+  private class DeviceCollector implements Closeable {
+
+    private final Map<Integer, LazyTsFileDeviceIterator> deviceIteratorMap = 
new HashMap<>();
+
+    private IDeviceID currentDevice;
+    private List<DeviceOffset> currentDeviceOffsets = Collections.emptyList();
+
+    private DeviceCollector() {
+      try {
+        for (int fileIndex = 0; fileIndex < tsFilePaths.size(); fileIndex++) {
+          TsFileSequenceReader reader =
+              FileReaderManager.getInstance()
+                  .get(tsFilePaths.get(fileIndex), null, true, ioSizeRecorder, 
true);
+          deviceIteratorMap.put(
+              fileIndex, new LazyTsFileDeviceIterator(reader, tableName, 
ioSizeRecorder));
+        }
+      } catch (IOException e) {
+        close();
+        throw new RuntimeException("Failed to create external TsFile device 
collector", e);
+      }
+    }
+
+    private boolean hasNextDevice() {
+      for (LazyTsFileDeviceIterator deviceIterator : 
deviceIteratorMap.values()) {
+        if (deviceIterator.hasNext()
+            || (deviceIterator.hasCurrent()
+                && 
!deviceIterator.getCurrentDeviceID().equals(currentDevice))) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    private IDeviceID nextDevice() {
+      IDeviceID minDevice = null;
+      Iterator<Map.Entry<Integer, LazyTsFileDeviceIterator>> iterator =
+          deviceIteratorMap.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<Integer, LazyTsFileDeviceIterator> entry = iterator.next();
+        LazyTsFileDeviceIterator deviceIterator = entry.getValue();
+        IDeviceID currentFileDevice = null;
+        if (deviceIterator.hasCurrent()) {
+          currentFileDevice = deviceIterator.getCurrentDeviceID();
+        }
+        if (currentFileDevice == null || 
currentFileDevice.equals(currentDevice)) {
+          if (deviceIterator.hasNext()) {
+            currentFileDevice = deviceIterator.next();
+          } else {
+            iterator.remove();
+            continue;
+          }
+        }
+        if (minDevice == null || minDevice.compareTo(currentFileDevice) > 0) {
+          minDevice = currentFileDevice;
+        }
+      }
+      currentDevice = minDevice;
+      collectCurrentDeviceOffsets();
+      return currentDevice;
+    }
+
+    private void collectCurrentDeviceOffsets() {
+      List<DeviceOffset> deviceOffsets = new ArrayList<>();
+      for (Map.Entry<Integer, LazyTsFileDeviceIterator> entry : 
deviceIteratorMap.entrySet()) {
+        LazyTsFileDeviceIterator deviceIterator = entry.getValue();
+        if (currentDevice != null
+            && deviceIterator.hasCurrent()
+            && currentDevice.equals(deviceIterator.getCurrentDeviceID())) {
+          deviceOffsets.add(
+              new DeviceOffset(
+                  entry.getKey(), 
deviceIterator.getCurrentDeviceMeasurementNodeOffset()));
+        }
+      }
+      currentDeviceOffsets = deviceOffsets;
+    }
+
+    private List<DeviceOffset> getCurrentDeviceOffsets() {
+      return currentDeviceOffsets;
+    }
+
+    @Override
+    public void close() {
+      deviceIteratorMap.clear();
+      currentDeviceOffsets = Collections.emptyList();
+    }
+  }
+
+  private static class DeviceTask {
+
+    private final int deviceEntryIndex;
+    private final List<DeviceOffset> deviceOffsets;
+
+    private DeviceTask(int deviceEntryIndex, List<DeviceOffset> deviceOffsets) 
{
+      this.deviceEntryIndex = deviceEntryIndex;
+      this.deviceOffsets = deviceOffsets;
+    }
+
+    private void serialize(DataOutputStream outputStream) throws IOException {
+      ReadWriteIOUtils.write(deviceEntryIndex, outputStream);
+      ReadWriteIOUtils.write(deviceOffsets.size(), outputStream);
+      for (DeviceOffset offset : deviceOffsets) {
+        ReadWriteIOUtils.write(offset.fileIndex, outputStream);
+        ReadWriteIOUtils.write(offset.measurementNodeOffset.length, 
outputStream);
+        for (long measurementNodeOffset : offset.measurementNodeOffset) {
+          ReadWriteIOUtils.write(measurementNodeOffset, outputStream);
+        }
+      }
+    }
+
+    private static DeviceTask deserialize(DataInputStream inputStream) throws 
IOException {
+      int deviceEntryIndex = ReadWriteIOUtils.readInt(inputStream);
+      int offsetSize = ReadWriteIOUtils.readInt(inputStream);
+      List<DeviceOffset> offsets = new ArrayList<>(offsetSize);
+      for (int i = 0; i < offsetSize; i++) {
+        int fileIndex = ReadWriteIOUtils.readInt(inputStream);
+        int measurementNodeOffsetLength = 
ReadWriteIOUtils.readInt(inputStream);
+        long[] measurementNodeOffset = new long[measurementNodeOffsetLength];
+        for (int j = 0; j < measurementNodeOffsetLength; j++) {
+          measurementNodeOffset[j] = inputStream.readLong();
+        }
+        offsets.add(new DeviceOffset(fileIndex, measurementNodeOffset));
+      }
+      return new DeviceTask(deviceEntryIndex, offsets);
+    }
+  }
+
+  public static class DeviceOffset {
+
+    private final int fileIndex;
+    private final long[] measurementNodeOffset;
+
+    private DeviceOffset(int fileIndex, long[] measurementNodeOffset) {
+      this.fileIndex = fileIndex;
+      this.measurementNodeOffset = measurementNodeOffset;
+    }
+
+    public int getFileIndex() {
+      return fileIndex;
+    }
+
+    public long[] getMeasurementNodeOffset() {
+      return measurementNodeOffset;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ReadTsFileTableFunction.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ReadTsFileTableFunction.java
similarity index 95%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ReadTsFileTableFunction.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ReadTsFileTableFunction.java
index 71ed1b3670f..26f7143595d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ReadTsFileTableFunction.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ReadTsFileTableFunction.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.plan.relational.function.tvf;
+package 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile;
 
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
@@ -118,7 +118,7 @@ public class ReadTsFileTableFunction implements 
TableFunction {
   public TableFunctionProcessorProvider getProcessorProvider(
       TableFunctionHandle tableFunctionHandle) {
     throw new UnsupportedOperationException(
-        "read_tsfile must be planned as an ExternalTsFileScanNode");
+        "readTsFile must be planned as an ExternalTsFileScanNode");
   }
 
   private static String getRequiredStringArgument(Map<String, Argument> 
arguments, String name) {
@@ -172,7 +172,7 @@ public class ReadTsFileTableFunction implements 
TableFunction {
         if (normalizedTsFilePath.startsWith(dataDir) || 
dataDir.startsWith(normalizedTsFilePath)) {
           throw new UDFArgumentNotValidException(
               String.format(
-                  "read_tsfile path %s is not allowed because it may access 
IoTDB data directory %s",
+                  "readTsFile path %s is not allowed because it may access 
IoTDB data directory %s",
                   tsFilePath, dataDir));
         }
       }
@@ -402,30 +402,6 @@ public class ReadTsFileTableFunction implements 
TableFunction {
     return builder.build();
   }
 
-  public static class ExternalTsFileDeviceOffset {
-
-    private final String tsFilePath;
-    private final long[] deviceMeasurementNodeOffset;
-
-    public ExternalTsFileDeviceOffset(String tsFilePath, long[] 
deviceMeasurementNodeOffset) {
-      this.tsFilePath = tsFilePath;
-      this.deviceMeasurementNodeOffset =
-          deviceMeasurementNodeOffset == null
-              ? null
-              : Arrays.copyOf(deviceMeasurementNodeOffset, 
deviceMeasurementNodeOffset.length);
-    }
-
-    public String getTsFilePath() {
-      return tsFilePath;
-    }
-
-    public long[] getDeviceMeasurementNodeOffset() {
-      return deviceMeasurementNodeOffset == null
-          ? null
-          : Arrays.copyOf(deviceMeasurementNodeOffset, 
deviceMeasurementNodeOffset.length);
-    }
-  }
-
   public static class ReadTsFileTableFunctionHandle implements 
TableFunctionHandle {
     private String tableName;
     private List<String> tsFilePaths;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index 5855b15922f..8fdf2c1aa1b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -115,7 +115,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.Scope;
 import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.tablefunction.TableArgumentAnalysis;
 import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.tablefunction.TableFunctionInvocationAnalysis;
 import 
org.apache.iotdb.db.queryengine.plan.relational.function.DataNodeTableBuiltinTableFunction;
-import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.ReadTsFileTableFunction;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ReadTsFileTableFunction;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.TreeDeviceViewSchema;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils;
@@ -1596,7 +1596,7 @@ public class RelationPlanner implements 
AstVisitor<RelationPlan, Void> {
       TableFunctionInvocation node, TableFunctionInvocationAnalysis 
functionAnalysis) {
     if (!(functionAnalysis.getTableFunctionHandle()
         instanceof ReadTsFileTableFunction.ReadTsFileTableFunctionHandle)) {
-      throw new IllegalStateException("read_tsfile table function handle is 
invalid");
+      throw new IllegalStateException("readTsFile table function handle is 
invalid");
     }
 
     ReadTsFileTableFunction.ReadTsFileTableFunctionHandle handle =
@@ -1607,17 +1607,20 @@ public class RelationPlanner implements 
AstVisitor<RelationPlan, Void> {
 
     ImmutableList.Builder<Symbol> outputSymbolsBuilder = 
ImmutableList.builder();
     ImmutableMap.Builder<Symbol, ColumnSchema> assignmentsBuilder = 
ImmutableMap.builder();
+    Map<Symbol, Integer> tagAndAttributeIndexMap = new HashMap<>();
+    int tagIndex = 0;
     for (int i = 0; i < relationType.getAllFieldCount(); i++) {
       Field field = relationType.getFieldByIndex(i);
       Symbol symbol = symbolAllocator.newSymbol(field);
+      TsTableColumnCategory columnCategory = 
handle.getOutputColumnCategories().get(i);
       outputSymbolsBuilder.add(symbol);
       assignmentsBuilder.put(
           symbol,
           new ColumnSchema(
-              field.getName().orElse(null),
-              field.getType(),
-              field.isHidden(),
-              handle.getOutputColumnCategories().get(i)));
+              field.getName().orElse(null), field.getType(), field.isHidden(), 
columnCategory));
+      if (columnCategory == TsTableColumnCategory.TAG) {
+        tagAndAttributeIndexMap.put(symbol, tagIndex++);
+      }
     }
 
     List<Symbol> outputSymbols = outputSymbolsBuilder.build();
@@ -1626,16 +1629,17 @@ public class RelationPlanner implements 
AstVisitor<RelationPlan, Void> {
         createExternalTsFileQualifiedObjectName(handle.getTableName());
     analysis.addTableSchema(qualifiedObjectName, assignments);
 
-    return new RelationPlan(
+    ExternalTsFileScanNode scanNode =
         new ExternalTsFileScanNode(
             idAllocator.genPlanNodeId(),
             qualifiedObjectName,
             outputSymbols,
             assignments,
-            handle.getTsFilePaths()),
-        scope,
-        outputSymbols,
-        outerContext);
+            tagAndAttributeIndexMap,
+            queryContext.createExternalTsFileQueryResource(
+                handle.getTableName(), handle.getTsFilePaths()));
+
+    return new RelationPlan(scanNode, scope, outputSymbols, outerContext);
   }
 
   private QualifiedObjectName createExternalTsFileQualifiedObjectName(String 
tableName) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 105dc1c1935..0b54b2395a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -73,7 +73,6 @@ import 
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.SymbolRefere
 import org.apache.iotdb.commons.schema.table.InformationSchema;
 import org.apache.iotdb.commons.schema.table.TsTable;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
-import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -88,6 +87,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.exceptions.RootFIPlacementEx
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceTaskPartition;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
@@ -737,46 +737,22 @@ public class TableDistributedPlanGenerator
         new TRegionReplicaSet(null, 
ImmutableList.of(DataNodeEndPoints.getLocalDataNodeLocation()));
     node.setRegionReplicaSet(localRegionReplicaSet);
     context.mostUsedRegion = node.getRegionReplicaSet();
-    List<PlanNode> resultNodes =
-        splitExternalTsFileScanByDeviceEntries(node, localRegionReplicaSet);
-    if (context.hasSortProperty) {
-      processSortProperty(node, resultNodes, context);
-    }
-    return resultNodes;
-  }
-
-  private List<PlanNode> splitExternalTsFileScanByDeviceEntries(
-      final ExternalTsFileScanNode node, final TRegionReplicaSet 
localRegionReplicaSet) {
-    List<DeviceEntry> deviceEntries = node.getDeviceEntries();
-    if (deviceEntries.size() <= 1) {
-      return Collections.singletonList(node);
-    }
-
-    int splitCount =
-        Math.min(
-            deviceEntries.size(),
-            
IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism());
-    if (splitCount <= 1) {
+    Optional<SortPropertyContext> sortPropertyContext =
+        context.hasSortProperty ? analyzeSortProperty(node, context) : 
Optional.empty();
+    int partitionCount = 
IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism();
+    node.getExternalTsFileQueryResource()
+        .collectDeviceEntries(
+            node.getSchemaFilter(),
+            sortPropertyContext.map(propertyContext -> 
propertyContext.comparator).orElse(null),
+            partitionCount);
+
+    List<DeviceTaskPartition> partitions =
+        node.getExternalTsFileQueryResource().getDeviceTaskPartitions();
+    if (partitions.isEmpty()) {
       return Collections.singletonList(node);
     }
-
-    List<List<DeviceEntry>> splitDeviceEntries = new ArrayList<>(splitCount);
-    List<List<List<ExternalTsFileDeviceOffset>>> splitDeviceOffsets = new 
ArrayList<>(splitCount);
-    for (int i = 0; i < splitCount; i++) {
-      splitDeviceEntries.add(new ArrayList<>());
-      splitDeviceOffsets.add(new ArrayList<>());
-    }
-    for (int i = 0; i < deviceEntries.size(); i++) {
-      splitDeviceEntries.get(i % splitCount).add(deviceEntries.get(i));
-      splitDeviceOffsets.get(i % 
splitCount).add(node.getDeviceOffsets().get(i));
-    }
-
-    List<PlanNode> result = new ArrayList<>(splitCount);
-    for (int i = 0; i < splitDeviceEntries.size(); i++) {
-      List<DeviceEntry> entries = splitDeviceEntries.get(i);
-      if (entries.isEmpty()) {
-        continue;
-      }
+    List<PlanNode> result = new ArrayList<>(partitions.size());
+    for (DeviceTaskPartition partition : partitions) {
       ExternalTsFileScanNode splitNode =
           new ExternalTsFileScanNode(
               queryId.genPlanNodeId(),
@@ -789,12 +765,16 @@ public class TableDistributedPlanGenerator
               node.getTimePredicate().orElse(null),
               node.getScanOrder(),
               node.isPushLimitToEachDevice(),
-              node.getTsFilePaths(),
-              entries,
-              splitDeviceOffsets.get(i));
+              node.getTagAndAttributeIndexMap(),
+              node.getExternalTsFileQueryResource(),
+              partition.getDeviceEntryIndexes(),
+              partition.getPartitionIndex(),
+              node.getSchemaFilter());
       splitNode.setRegionReplicaSet(localRegionReplicaSet);
       result.add(splitNode);
     }
+    sortPropertyContext.ifPresent(
+        propertyContext -> applySortProperty(node, result, propertyContext, 
false));
     return result;
   }
 
@@ -805,53 +785,28 @@ public class TableDistributedPlanGenerator
         new TRegionReplicaSet(null, 
ImmutableList.of(DataNodeEndPoints.getLocalDataNodeLocation()));
     node.setRegionReplicaSet(localRegionReplicaSet);
     context.mostUsedRegion = node.getRegionReplicaSet();
-    List<PlanNode> resultNodes =
-        splitExternalTsFileAggregationScanByDeviceEntries(node, 
localRegionReplicaSet);
-    if (context.hasSortProperty) {
-      processSortProperty(node, resultNodes, context);
-    }
-    return resultNodes;
-  }
-
-  private List<PlanNode> splitExternalTsFileAggregationScanByDeviceEntries(
-      final ExternalTsFileAggregationScanNode node, final TRegionReplicaSet 
localRegionReplicaSet) {
-    List<DeviceEntry> deviceEntries = node.getDeviceEntries();
-    if (deviceEntries.size() <= 1) {
-      return Collections.singletonList(node);
-    }
-
-    int splitCount =
-        Math.min(
-            deviceEntries.size(),
-            
IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism());
-    if (splitCount <= 1) {
+    Optional<SortPropertyContext> sortPropertyContext =
+        context.hasSortProperty ? analyzeSortProperty(node, context) : 
Optional.empty();
+    int partitionCount = 
IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism();
+    node.getExternalTsFileQueryResource()
+        .collectDeviceEntries(
+            node.getSchemaFilter(),
+            sortPropertyContext.map(propertyContext -> 
propertyContext.comparator).orElse(null),
+            partitionCount);
+
+    List<DeviceTaskPartition> partitions =
+        node.getExternalTsFileQueryResource().getDeviceTaskPartitions();
+    if (partitions.isEmpty()) {
       return Collections.singletonList(node);
     }
-
-    List<List<DeviceEntry>> splitDeviceEntries = new ArrayList<>(splitCount);
-    List<List<List<ExternalTsFileDeviceOffset>>> splitDeviceOffsets = new 
ArrayList<>(splitCount);
-    for (int i = 0; i < splitCount; i++) {
-      splitDeviceEntries.add(new ArrayList<>());
-      splitDeviceOffsets.add(new ArrayList<>());
-    }
-    for (int i = 0; i < deviceEntries.size(); i++) {
-      splitDeviceEntries.get(i % splitCount).add(deviceEntries.get(i));
-      splitDeviceOffsets.get(i % 
splitCount).add(node.getDeviceOffsets().get(i));
-    }
-
-    List<PlanNode> result = new ArrayList<>(splitCount);
-    for (int i = 0; i < splitDeviceEntries.size(); i++) {
-      List<DeviceEntry> entries = splitDeviceEntries.get(i);
-      if (entries.isEmpty()) {
-        continue;
-      }
+    List<PlanNode> result = new ArrayList<>(partitions.size());
+    for (DeviceTaskPartition partition : partitions) {
       ExternalTsFileAggregationScanNode splitNode =
           new ExternalTsFileAggregationScanNode(
               queryId.genPlanNodeId(),
               node.getQualifiedObjectName(),
               node.getOutputSymbols(),
               node.getAssignments(),
-              entries,
               node.getTagAndAttributeIndexMap(),
               node.getScanOrder(),
               node.getTimePredicate().orElse(null),
@@ -866,11 +821,15 @@ public class TableDistributedPlanGenerator
               node.getPreGroupedSymbols(),
               node.getStep(),
               node.getGroupIdSymbol(),
-              node.getTsFilePaths(),
-              splitDeviceOffsets.get(i));
+              node.getExternalTsFileQueryResource(),
+              partition.getDeviceEntryIndexes(),
+              partition.getPartitionIndex(),
+              node.getSchemaFilter());
       splitNode.setRegionReplicaSet(localRegionReplicaSet);
       result.add(splitNode);
     }
+    sortPropertyContext.ifPresent(
+        propertyContext -> applySortProperty(node, result, propertyContext, 
false));
     return result;
   }
 
@@ -1949,18 +1908,26 @@ public class TableDistributedPlanGenerator
       final DeviceTableScanNode deviceTableScanNode,
       final List<PlanNode> resultTableScanNodeList,
       final PlanContext context) {
+    Optional<SortPropertyContext> sortPropertyContext =
+        analyzeSortProperty(deviceTableScanNode, context);
+    if (!sortPropertyContext.isPresent()) {
+      return;
+    }
+    applySortProperty(
+        deviceTableScanNode, resultTableScanNodeList, 
sortPropertyContext.get(), true);
+  }
+
+  private Optional<SortPropertyContext> analyzeSortProperty(
+      DeviceTableScanNode deviceTableScanNode, PlanContext context) {
     final List<Symbol> newOrderingSymbols = new ArrayList<>();
     final List<SortOrder> newSortOrders = new ArrayList<>();
     final OrderingScheme expectedOrderingScheme = 
context.expectedOrderingScheme;
 
     boolean lastIsTimeRelated = false;
+    boolean scanOrderDesc = false;
     for (final Symbol symbol : expectedOrderingScheme.getOrderBy()) {
       if (timeRelatedSymbol(symbol, deviceTableScanNode)) {
-        if (!expectedOrderingScheme.getOrderings().get(symbol).isAscending()) {
-          // TODO(beyyes) move scan order judgement into logical plan optimizer
-          resultTableScanNodeList.forEach(
-              node -> ((DeviceTableScanNode) 
node).setScanOrder(Ordering.DESC));
-        }
+        scanOrderDesc = 
!expectedOrderingScheme.getOrderings().get(symbol).isAscending();
         newOrderingSymbols.add(symbol);
         newSortOrders.add(expectedOrderingScheme.getOrdering(symbol));
         lastIsTimeRelated = true;
@@ -1975,9 +1942,22 @@ public class TableDistributedPlanGenerator
 
     // no sort property can be pushed down into DeviceTableScanNode
     if (newOrderingSymbols.isEmpty()) {
-      return;
+      return Optional.empty();
     }
 
+    return Optional.of(
+        new SortPropertyContext(
+            newOrderingSymbols,
+            newSortOrders,
+            createDeviceEntryComparator(deviceTableScanNode, 
newOrderingSymbols, newSortOrders),
+            lastIsTimeRelated,
+            scanOrderDesc));
+  }
+
+  private Comparator<DeviceEntry> createDeviceEntryComparator(
+      DeviceTableScanNode deviceTableScanNode,
+      List<Symbol> newOrderingSymbols,
+      List<SortOrder> newSortOrders) {
     Optional<IDeviceID.TreeDeviceIdColumnValueExtractor> extractor =
         createTreeDeviceIdColumnValueExtractor(deviceTableScanNode);
     final List<Function<DeviceEntry, String>> orderingRules = new 
ArrayList<>();
@@ -2052,21 +2032,34 @@ public class TableDistributedPlanGenerator
         comparator = comparator.thenComparing(thenComparator);
       }
     }
+    return comparator;
+  }
 
+  private void applySortProperty(
+      final DeviceTableScanNode deviceTableScanNode,
+      final List<PlanNode> resultTableScanNodeList,
+      final SortPropertyContext sortPropertyContext,
+      final boolean sortDeviceEntries) {
     final Optional<OrderingScheme> newOrderingScheme =
         tableScanOrderingSchema(
             
analysis.getTableColumnSchema(deviceTableScanNode.getQualifiedObjectName()),
             deviceTableScanNode.getAssignments(),
-            newOrderingSymbols,
-            newSortOrders,
-            lastIsTimeRelated,
-            deviceTableScanNode.getDeviceEntries().size() == 1);
+            sortPropertyContext.orderingSymbols,
+            sortPropertyContext.sortOrders,
+            sortPropertyContext.lastIsTimeRelated,
+            resultTableScanNodeList.size() == 1
+                && ((DeviceTableScanNode) 
resultTableScanNodeList.get(0)).getDeviceEntries().size()
+                    == 1);
     for (final PlanNode planNode : resultTableScanNodeList) {
       final DeviceTableScanNode scanNode = (DeviceTableScanNode) planNode;
+      if (sortPropertyContext.scanOrderDesc) {
+        // TODO(beyyes) move scan order judgement into logical plan optimizer
+        scanNode.setScanOrder(Ordering.DESC);
+      }
       newOrderingScheme.ifPresent(
           orderingScheme -> nodeOrderingMap.put(scanNode.getPlanNodeId(), 
orderingScheme));
-      if (comparator != null) {
-        scanNode.sortDeviceEntries(comparator);
+      if (sortDeviceEntries && sortPropertyContext.comparator != null) {
+        scanNode.getDeviceEntries().sort(sortPropertyContext.comparator);
       }
     }
   }
@@ -2386,6 +2379,27 @@ public class TableDistributedPlanGenerator
             node.getOutputSymbols()));
   }
 
+  private static class SortPropertyContext {
+    final List<Symbol> orderingSymbols;
+    final List<SortOrder> sortOrders;
+    final Comparator<DeviceEntry> comparator;
+    final boolean lastIsTimeRelated;
+    final boolean scanOrderDesc;
+
+    private SortPropertyContext(
+        List<Symbol> orderingSymbols,
+        List<SortOrder> sortOrders,
+        Comparator<DeviceEntry> comparator,
+        boolean lastIsTimeRelated,
+        boolean scanOrderDesc) {
+      this.orderingSymbols = orderingSymbols;
+      this.sortOrders = sortOrders;
+      this.comparator = comparator;
+      this.lastIsTimeRelated = lastIsTimeRelated;
+      this.scanOrderDesc = scanOrderDesc;
+    }
+  }
+
   public static class PlanContext {
     final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
     boolean hasExchangeNode = false;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java
index a8a36c6328e..95a833074c1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java
@@ -103,9 +103,12 @@ public class PruneTableScanColumns extends 
ProjectOffPushDownRule<TableScanNode>
               externalTsFileScanNode.getPushDownOffset(),
               externalTsFileScanNode.getTimePredicate().orElse(null),
               externalTsFileScanNode.getScanOrder(),
-              externalTsFileScanNode.getTsFilePaths(),
-              externalTsFileScanNode.getDeviceEntries(),
-              externalTsFileScanNode.getDeviceOffsets());
+              externalTsFileScanNode.isPushLimitToEachDevice(),
+              externalTsFileScanNode.getTagAndAttributeIndexMap(),
+              externalTsFileScanNode.getExternalTsFileQueryResource(),
+              externalTsFileScanNode.getDeviceEntryIndexes(),
+              externalTsFileScanNode.getDeviceTaskPartitionIndex(),
+              externalTsFileScanNode.getSchemaFilter());
       
prunedNode.setRegionReplicaSet(externalTsFileScanNode.getRegionReplicaSet());
       return Optional.of(prunedNode);
     } else if (node instanceof DeviceTableScanNode) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
index 19057bc7146..9402e837575 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
@@ -297,28 +297,31 @@ public class AggregationTableScanNode extends 
DeviceTableScanNode {
       DeviceTableScanNode tableScanNode) {
     if (tableScanNode instanceof ExternalTsFileScanNode) {
       ExternalTsFileScanNode externalTsFileScanNode = (ExternalTsFileScanNode) 
tableScanNode;
-      return new ExternalTsFileAggregationScanNode(
-          id,
-          tableScanNode.getQualifiedObjectName(),
-          tableScanNode.getOutputSymbols(),
-          tableScanNode.getAssignments(),
-          tableScanNode.getDeviceEntries(),
-          tableScanNode.getTagAndAttributeIndexMap(),
-          tableScanNode.getScanOrder(),
-          tableScanNode.getTimePredicate().orElse(null),
-          tableScanNode.getPushDownPredicate(),
-          tableScanNode.getPushDownLimit(),
-          tableScanNode.getPushDownOffset(),
-          tableScanNode.isPushLimitToEachDevice(),
-          tableScanNode.containsNonAlignedDevice(),
-          projectNode == null ? null : projectNode.getAssignments(),
-          aggregationNode.getAggregations(),
-          aggregationNode.getGroupingSets(),
-          aggregationNode.getPreGroupedSymbols(),
-          aggregationNode.getStep(),
-          aggregationNode.getGroupIdSymbol(),
-          externalTsFileScanNode.getTsFilePaths(),
-          externalTsFileScanNode.getDeviceOffsets());
+      ExternalTsFileAggregationScanNode scanNode =
+          new ExternalTsFileAggregationScanNode(
+              id,
+              tableScanNode.getQualifiedObjectName(),
+              tableScanNode.getOutputSymbols(),
+              tableScanNode.getAssignments(),
+              tableScanNode.getTagAndAttributeIndexMap(),
+              tableScanNode.getScanOrder(),
+              tableScanNode.getTimePredicate().orElse(null),
+              tableScanNode.getPushDownPredicate(),
+              tableScanNode.getPushDownLimit(),
+              tableScanNode.getPushDownOffset(),
+              tableScanNode.isPushLimitToEachDevice(),
+              tableScanNode.containsNonAlignedDevice(),
+              projectNode == null ? null : projectNode.getAssignments(),
+              aggregationNode.getAggregations(),
+              aggregationNode.getGroupingSets(),
+              aggregationNode.getPreGroupedSymbols(),
+              aggregationNode.getStep(),
+              aggregationNode.getGroupIdSymbol(),
+              externalTsFileScanNode.getExternalTsFileQueryResource(),
+              externalTsFileScanNode.getDeviceEntryIndexes(),
+              externalTsFileScanNode.getDeviceTaskPartitionIndex(),
+              externalTsFileScanNode.getSchemaFilter());
+      return scanNode;
     }
     if (tableScanNode instanceof TreeDeviceViewScanNode) {
       TreeDeviceViewScanNode treeDeviceViewScanNode = (TreeDeviceViewScanNode) 
tableScanNode;
@@ -381,7 +384,6 @@ public class AggregationTableScanNode extends 
DeviceTableScanNode {
           tableScanNode.getQualifiedObjectName(),
           tableScanNode.getOutputSymbols(),
           tableScanNode.getAssignments(),
-          tableScanNode.getDeviceEntries(),
           tableScanNode.getTagAndAttributeIndexMap(),
           tableScanNode.getScanOrder(),
           tableScanNode.getTimePredicate().orElse(null),
@@ -396,8 +398,10 @@ public class AggregationTableScanNode extends 
DeviceTableScanNode {
           aggregationNode.getPreGroupedSymbols(),
           step,
           aggregationNode.getGroupIdSymbol(),
-          externalTsFileScanNode.getTsFilePaths(),
-          externalTsFileScanNode.getDeviceOffsets());
+          externalTsFileScanNode.getExternalTsFileQueryResource(),
+          externalTsFileScanNode.getDeviceEntryIndexes(),
+          externalTsFileScanNode.getDeviceTaskPartitionIndex(),
+          externalTsFileScanNode.getSchemaFilter());
     }
     if (tableScanNode instanceof TreeDeviceViewScanNode) {
       TreeDeviceViewScanNode treeDeviceViewScanNode = (TreeDeviceViewScanNode) 
tableScanNode;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/DeviceTableScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/DeviceTableScanNode.java
index e4fceb9818b..c02c74058af 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/DeviceTableScanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/DeviceTableScanNode.java
@@ -41,7 +41,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -276,10 +275,6 @@ public class DeviceTableScanNode extends TableScanNode {
     this.deviceEntries.add(deviceEntry);
   }
 
-  public void sortDeviceEntries(Comparator<DeviceEntry> comparator) {
-    this.deviceEntries.sort(comparator);
-  }
-
   public void setPushLimitToEachDevice(boolean pushLimitToEachDevice) {
     this.pushLimitToEachDevice = pushLimitToEachDevice;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileAggregationScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileAggregationScanNode.java
index 04434ad498e..dd338dc2d90 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileAggregationScanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileAggregationScanNode.java
@@ -27,32 +27,32 @@ import 
org.apache.iotdb.commons.queryengine.plan.relational.planner.Assignments;
 import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.commons.queryengine.plan.relational.planner.node.AggregationNode;
 import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression;
-import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset;
+import org.apache.iotdb.commons.schema.filter.SchemaFilter;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 
+import com.google.common.collect.Lists;
+
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.IntStream;
 
 public class ExternalTsFileAggregationScanNode extends 
AggregationTableScanNode {
-  private List<String> tsFilePaths;
-  private List<List<ExternalTsFileDeviceOffset>> deviceOffsets = 
Collections.emptyList();
+  private final ExternalTsFileQueryResource externalTsFileQueryResource;
+  private List<Integer> deviceEntryIndexes;
+  private int deviceTaskPartitionIndex = -1;
+  private SchemaFilter schemaFilter;
 
   public ExternalTsFileAggregationScanNode(
       PlanNodeId id,
       QualifiedObjectName qualifiedObjectName,
       List<Symbol> outputSymbols,
       Map<Symbol, ColumnSchema> assignments,
-      List<DeviceEntry> deviceEntries,
       Map<Symbol, Integer> tagAndAttributeIndexMap,
       Ordering scanOrder,
       Expression timePredicate,
@@ -67,14 +67,16 @@ public class ExternalTsFileAggregationScanNode extends 
AggregationTableScanNode
       List<Symbol> preGroupedSymbols,
       AggregationNode.Step step,
       Optional<Symbol> groupIdSymbol,
-      List<String> tsFilePaths,
-      List<List<ExternalTsFileDeviceOffset>> deviceOffsets) {
+      ExternalTsFileQueryResource externalTsFileQueryResource,
+      List<Integer> deviceEntryIndexes,
+      int deviceTaskPartitionIndex,
+      SchemaFilter schemaFilter) {
     super(
         id,
         qualifiedObjectName,
         outputSymbols,
         assignments,
-        deviceEntries,
+        Lists.transform(deviceEntryIndexes, 
externalTsFileQueryResource.getDeviceEntries()::get),
         tagAndAttributeIndexMap,
         scanOrder,
         timePredicate,
@@ -89,12 +91,12 @@ public class ExternalTsFileAggregationScanNode extends 
AggregationTableScanNode
         preGroupedSymbols,
         step,
         groupIdSymbol);
-    this.tsFilePaths = Collections.unmodifiableList(new 
ArrayList<>(tsFilePaths));
-    this.deviceOffsets = copyDeviceOffsets(deviceOffsets);
+    this.externalTsFileQueryResource = externalTsFileQueryResource;
+    this.deviceEntryIndexes = deviceEntryIndexes;
+    this.deviceTaskPartitionIndex = deviceTaskPartitionIndex;
+    this.schemaFilter = schemaFilter;
   }
 
-  protected ExternalTsFileAggregationScanNode() {}
-
   @Override
   public <R, C> R accept(IPlanVisitor<R, C> visitor, C context) {
     return ((PlanVisitor<R, C>) 
visitor).visitExternalTsFileAggregationScan(this, context);
@@ -107,7 +109,6 @@ public class ExternalTsFileAggregationScanNode extends 
AggregationTableScanNode
         qualifiedObjectName,
         outputSymbols,
         assignments,
-        deviceEntries,
         tagAndAttributeIndexMap,
         scanOrder,
         timePredicate,
@@ -122,47 +123,40 @@ public class ExternalTsFileAggregationScanNode extends 
AggregationTableScanNode
         preGroupedSymbols,
         step,
         groupIdSymbol,
-        tsFilePaths,
-        deviceOffsets);
+        externalTsFileQueryResource,
+        deviceEntryIndexes,
+        deviceTaskPartitionIndex,
+        schemaFilter);
   }
 
   public List<String> getTsFilePaths() {
-    return tsFilePaths;
+    return externalTsFileQueryResource.getTsFilePaths();
+  }
+
+  public ExternalTsFileQueryResource getExternalTsFileQueryResource() {
+    return externalTsFileQueryResource;
+  }
+
+  public List<Integer> getDeviceEntryIndexes() {
+    return deviceEntryIndexes;
   }
 
-  public List<List<ExternalTsFileDeviceOffset>> getDeviceOffsets() {
-    return deviceOffsets;
+  public int getDeviceTaskPartitionIndex() {
+    return deviceTaskPartitionIndex;
   }
 
   @Override
-  public void sortDeviceEntries(Comparator<DeviceEntry> comparator) {
-    int[] indexes =
-        IntStream.range(0, deviceEntries.size())
-            .boxed()
-            .sorted(
-                (left, right) ->
-                    comparator.compare(deviceEntries.get(left), 
deviceEntries.get(right)))
-            .mapToInt(Integer::intValue)
-            .toArray();
-    List<DeviceEntry> sortedDeviceEntries = new 
ArrayList<>(deviceEntries.size());
-    List<List<ExternalTsFileDeviceOffset>> sortedDeviceOffsets =
-        new ArrayList<>(deviceOffsets.size());
-    for (int index : indexes) {
-      sortedDeviceEntries.add(deviceEntries.get(index));
-      sortedDeviceOffsets.add(deviceOffsets.get(index));
-    }
-    this.deviceEntries = sortedDeviceEntries;
-    this.deviceOffsets = sortedDeviceOffsets;
+  public void setDeviceEntries(List<DeviceEntry> deviceEntries) {
+    throw new UnsupportedOperationException(
+        "ExternalTsFileAggregationScanNode device entries must be set by 
device entry indexes");
+  }
+
+  public SchemaFilter getSchemaFilter() {
+    return schemaFilter;
   }
 
-  private static List<List<ExternalTsFileDeviceOffset>> copyDeviceOffsets(
-      List<List<ExternalTsFileDeviceOffset>> deviceOffsets) {
-    List<List<ExternalTsFileDeviceOffset>> copiedDeviceOffsets =
-        new ArrayList<>(deviceOffsets.size());
-    for (List<ExternalTsFileDeviceOffset> offsets : deviceOffsets) {
-      copiedDeviceOffsets.add(new ArrayList<>(offsets));
-    }
-    return copiedDeviceOffsets;
+  public void setSchemaFilter(SchemaFilter schemaFilter) {
+    this.schemaFilter = schemaFilter;
   }
 
   @Override
@@ -177,11 +171,6 @@ public class ExternalTsFileAggregationScanNode extends 
AggregationTableScanNode
         "ExternalTsFileAggregationScanNode cannot be serialized because it 
reads local external TsFiles");
   }
 
-  public static ExternalTsFileAggregationScanNode deserialize(ByteBuffer 
byteBuffer) {
-    throw new UnsupportedOperationException(
-        "ExternalTsFileAggregationScanNode cannot be deserialized because it 
reads local external TsFiles");
-  }
-
   @Override
   public String toString() {
     return "ExternalTsFileAggregationScanNode-" + this.getPlanNodeId();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java
index f21a7a755c0..be83256c404 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java
@@ -25,26 +25,26 @@ import 
org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchem
 import 
org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName;
 import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol;
 import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression;
-import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset;
+import org.apache.iotdb.commons.schema.filter.SchemaFilter;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 
+import com.google.common.collect.Lists;
+
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.IntStream;
-
-import static 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TAG;
 
 public class ExternalTsFileScanNode extends DeviceTableScanNode {
-  private List<String> tsFilePaths;
-  private List<List<ExternalTsFileDeviceOffset>> deviceOffsets = 
Collections.emptyList();
+  private ExternalTsFileQueryResource externalTsFileQueryResource;
+  private List<Integer> deviceEntryIndexes = Collections.emptyList();
+  private int deviceTaskPartitionIndex = -1;
+  private SchemaFilter schemaFilter;
 
   protected ExternalTsFileScanNode() {}
 
@@ -53,133 +53,10 @@ public class ExternalTsFileScanNode extends 
DeviceTableScanNode {
       QualifiedObjectName qualifiedObjectName,
       List<Symbol> outputSymbols,
       Map<Symbol, ColumnSchema> assignments,
-      List<String> tsFilePaths) {
-    this(
-        id,
-        qualifiedObjectName,
-        outputSymbols,
-        assignments,
-        tsFilePaths,
-        Collections.emptyList(),
-        Collections.emptyList());
-  }
-
-  public ExternalTsFileScanNode(
-      PlanNodeId id,
-      QualifiedObjectName qualifiedObjectName,
-      List<Symbol> outputSymbols,
-      Map<Symbol, ColumnSchema> assignments,
-      List<String> tsFilePaths,
-      List<DeviceEntry> deviceEntries,
-      List<List<ExternalTsFileDeviceOffset>> deviceOffsets) {
-    super(id, qualifiedObjectName, outputSymbols, assignments, 
buildTagIndexMap(assignments));
-    this.tsFilePaths = Collections.unmodifiableList(new 
ArrayList<>(tsFilePaths));
-    this.deviceEntries = new ArrayList<>(deviceEntries);
-    this.deviceOffsets = copyDeviceOffsets(deviceOffsets);
-  }
-
-  public ExternalTsFileScanNode(
-      PlanNodeId id,
-      QualifiedObjectName qualifiedObjectName,
-      List<Symbol> outputSymbols,
-      Map<Symbol, ColumnSchema> assignments,
-      Expression pushDownPredicate,
-      long pushDownLimit,
-      long pushDownOffset,
-      Ordering scanOrder,
-      List<String> tsFilePaths) {
-    this(
-        id,
-        qualifiedObjectName,
-        outputSymbols,
-        assignments,
-        pushDownPredicate,
-        pushDownLimit,
-        pushDownOffset,
-        scanOrder,
-        tsFilePaths,
-        Collections.emptyList());
-  }
-
-  public ExternalTsFileScanNode(
-      PlanNodeId id,
-      QualifiedObjectName qualifiedObjectName,
-      List<Symbol> outputSymbols,
-      Map<Symbol, ColumnSchema> assignments,
-      Expression pushDownPredicate,
-      long pushDownLimit,
-      long pushDownOffset,
-      Ordering scanOrder,
-      List<String> tsFilePaths,
-      List<DeviceEntry> deviceEntries) {
-    this(
-        id,
-        qualifiedObjectName,
-        outputSymbols,
-        assignments,
-        pushDownPredicate,
-        pushDownLimit,
-        pushDownOffset,
-        null,
-        scanOrder,
-        tsFilePaths,
-        deviceEntries,
-        Collections.emptyList());
-  }
-
-  public ExternalTsFileScanNode(
-      PlanNodeId id,
-      QualifiedObjectName qualifiedObjectName,
-      List<Symbol> outputSymbols,
-      Map<Symbol, ColumnSchema> assignments,
-      Expression pushDownPredicate,
-      long pushDownLimit,
-      long pushDownOffset,
-      Expression timePredicate,
-      Ordering scanOrder,
-      List<String> tsFilePaths) {
-    this(
-        id,
-        qualifiedObjectName,
-        outputSymbols,
-        assignments,
-        pushDownPredicate,
-        pushDownLimit,
-        pushDownOffset,
-        timePredicate,
-        scanOrder,
-        tsFilePaths,
-        Collections.emptyList(),
-        Collections.emptyList());
-  }
-
-  public ExternalTsFileScanNode(
-      PlanNodeId id,
-      QualifiedObjectName qualifiedObjectName,
-      List<Symbol> outputSymbols,
-      Map<Symbol, ColumnSchema> assignments,
-      Expression pushDownPredicate,
-      long pushDownLimit,
-      long pushDownOffset,
-      Expression timePredicate,
-      Ordering scanOrder,
-      List<String> tsFilePaths,
-      List<DeviceEntry> deviceEntries,
-      List<List<ExternalTsFileDeviceOffset>> deviceOffsets) {
-    this(
-        id,
-        qualifiedObjectName,
-        outputSymbols,
-        assignments,
-        pushDownPredicate,
-        pushDownLimit,
-        pushDownOffset,
-        timePredicate,
-        scanOrder,
-        false,
-        tsFilePaths,
-        deviceEntries,
-        deviceOffsets);
+      Map<Symbol, Integer> tagAndAttributeIndexMap,
+      ExternalTsFileQueryResource externalTsFileQueryResource) {
+    super(id, qualifiedObjectName, outputSymbols, assignments, 
tagAndAttributeIndexMap);
+    this.externalTsFileQueryResource = externalTsFileQueryResource;
   }
 
   public ExternalTsFileScanNode(
@@ -193,16 +70,18 @@ public class ExternalTsFileScanNode extends 
DeviceTableScanNode {
       Expression timePredicate,
       Ordering scanOrder,
       boolean pushLimitToEachDevice,
-      List<String> tsFilePaths,
-      List<DeviceEntry> deviceEntries,
-      List<List<ExternalTsFileDeviceOffset>> deviceOffsets) {
+      Map<Symbol, Integer> tagAndAttributeIndexMap,
+      ExternalTsFileQueryResource externalTsFileQueryResource,
+      List<Integer> deviceEntryIndexes,
+      int deviceTaskPartitionIndex,
+      SchemaFilter schemaFilter) {
     super(
         id,
         qualifiedObjectName,
         outputSymbols,
         assignments,
-        new ArrayList<>(deviceEntries),
-        buildTagIndexMap(assignments),
+        Lists.transform(deviceEntryIndexes, 
externalTsFileQueryResource.getDeviceEntries()::get),
+        tagAndAttributeIndexMap,
         scanOrder,
         timePredicate,
         pushDownPredicate,
@@ -210,8 +89,10 @@ public class ExternalTsFileScanNode extends 
DeviceTableScanNode {
         pushDownOffset,
         pushLimitToEachDevice,
         false);
-    this.tsFilePaths = Collections.unmodifiableList(new 
ArrayList<>(tsFilePaths));
-    this.deviceOffsets = copyDeviceOffsets(deviceOffsets);
+    this.externalTsFileQueryResource = externalTsFileQueryResource;
+    this.deviceEntryIndexes = deviceEntryIndexes;
+    this.deviceTaskPartitionIndex = deviceTaskPartitionIndex;
+    this.schemaFilter = schemaFilter;
   }
 
   @Override
@@ -232,52 +113,41 @@ public class ExternalTsFileScanNode extends 
DeviceTableScanNode {
         timePredicate,
         scanOrder,
         pushLimitToEachDevice,
-        tsFilePaths,
-        deviceEntries,
-        deviceOffsets);
+        tagAndAttributeIndexMap,
+        externalTsFileQueryResource,
+        deviceEntryIndexes,
+        deviceTaskPartitionIndex,
+        schemaFilter);
   }
 
   public List<String> getTsFilePaths() {
-    return tsFilePaths;
+    return externalTsFileQueryResource.getTsFilePaths();
+  }
+
+  public ExternalTsFileQueryResource getExternalTsFileQueryResource() {
+    return externalTsFileQueryResource;
   }
 
-  public List<List<ExternalTsFileDeviceOffset>> getDeviceOffsets() {
-    return deviceOffsets;
+  public List<Integer> getDeviceEntryIndexes() {
+    return deviceEntryIndexes;
   }
 
-  public void setDeviceOffsets(List<List<ExternalTsFileDeviceOffset>> 
deviceOffsets) {
-    this.deviceOffsets = copyDeviceOffsets(deviceOffsets);
+  public int getDeviceTaskPartitionIndex() {
+    return deviceTaskPartitionIndex;
   }
 
   @Override
-  public void sortDeviceEntries(Comparator<DeviceEntry> comparator) {
-    int[] indexes =
-        IntStream.range(0, deviceEntries.size())
-            .boxed()
-            .sorted(
-                (left, right) ->
-                    comparator.compare(deviceEntries.get(left), 
deviceEntries.get(right)))
-            .mapToInt(Integer::intValue)
-            .toArray();
-    List<DeviceEntry> sortedDeviceEntries = new 
ArrayList<>(deviceEntries.size());
-    List<List<ExternalTsFileDeviceOffset>> sortedDeviceOffsets =
-        new ArrayList<>(deviceOffsets.size());
-    for (int index : indexes) {
-      sortedDeviceEntries.add(deviceEntries.get(index));
-      sortedDeviceOffsets.add(deviceOffsets.get(index));
-    }
-    this.deviceEntries = sortedDeviceEntries;
-    this.deviceOffsets = sortedDeviceOffsets;
+  public void setDeviceEntries(List<DeviceEntry> deviceEntries) {
+    throw new UnsupportedOperationException(
+        "ExternalTsFileScanNode device entries must be set by device entry 
indexes");
+  }
+
+  public SchemaFilter getSchemaFilter() {
+    return schemaFilter;
   }
 
-  private static List<List<ExternalTsFileDeviceOffset>> copyDeviceOffsets(
-      List<List<ExternalTsFileDeviceOffset>> deviceOffsets) {
-    List<List<ExternalTsFileDeviceOffset>> copiedDeviceOffsets =
-        new ArrayList<>(deviceOffsets.size());
-    for (List<ExternalTsFileDeviceOffset> offsets : deviceOffsets) {
-      copiedDeviceOffsets.add(new ArrayList<>(offsets));
-    }
-    return copiedDeviceOffsets;
+  public void setSchemaFilter(SchemaFilter schemaFilter) {
+    this.schemaFilter = schemaFilter;
   }
 
   @Override
@@ -292,24 +162,8 @@ public class ExternalTsFileScanNode extends 
DeviceTableScanNode {
         "ExternalTsFileScanNode cannot be serialized because it reads local 
external TsFiles");
   }
 
-  public static ExternalTsFileScanNode deserialize(ByteBuffer byteBuffer) {
-    throw new UnsupportedOperationException(
-        "ExternalTsFileScanNode cannot be deserialized because it reads local 
external TsFiles");
-  }
-
   @Override
   public String toString() {
     return "ExternalTsFileScanNode-" + this.getPlanNodeId();
   }
-
-  private static Map<Symbol, Integer> buildTagIndexMap(Map<Symbol, 
ColumnSchema> assignments) {
-    Map<Symbol, Integer> tagIndexMap = new java.util.HashMap<>();
-    int tagIndex = 0;
-    for (Map.Entry<Symbol, ColumnSchema> entry : assignments.entrySet()) {
-      if (TAG.equals(entry.getValue().getColumnCategory())) {
-        tagIndexMap.put(entry.getKey(), tagIndex++);
-      }
-    }
-    return tagIndexMap;
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index af6ddc5db1c..5ffac0473ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@ -53,12 +53,10 @@ import org.apache.iotdb.commons.schema.filter.SchemaFilter;
 import org.apache.iotdb.commons.schema.table.InformationSchema;
 import org.apache.iotdb.commons.schema.table.TsTable;
 import org.apache.iotdb.commons.schema.table.column.TagColumnSchema;
-import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.QueryId;
-import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.ExternalTsFileDeviceFilterVisitor;
 import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
@@ -68,7 +66,6 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.Conver
 import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.PredicateCombineIntoTableScanChecker;
 import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.PredicatePushIntoMetadataChecker;
 import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.schema.ConvertSchemaPredicateToFilterVisitor;
-import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.NonAlignedDeviceEntry;
@@ -87,24 +84,18 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceVi
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.read.LazyTsFileDeviceIterator;
-import org.apache.tsfile.read.TsFileSequenceReader;
 import org.apache.tsfile.read.common.type.Type;
 import org.apache.tsfile.read.filter.basic.Filter;
-import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.Pair;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -560,43 +551,8 @@ public class PushPredicateIntoTableScan implements 
PlanOptimizer {
 
     private void collectExternalTsFileDeviceTasks(
         ExternalTsFileScanNode tableScanNode, List<Expression> 
metadataExpressions) {
-      SchemaFilter deviceFilter =
-          constructExternalTsFileDeviceFilter(tableScanNode, 
metadataExpressions);
-      ExternalTsFileDeviceFilterVisitor deviceFilterVisitor =
-          new ExternalTsFileDeviceFilterVisitor();
-      Map<IDeviceID, List<ExternalTsFileDeviceOffset>> deviceOffsetMap = new 
LinkedHashMap<>();
-      for (String tsFilePath : tableScanNode.getTsFilePaths()) {
-        try (TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFilePath)) {
-          LazyTsFileDeviceIterator deviceIterator =
-              new LazyTsFileDeviceIterator(
-                  reader, 
tableScanNode.getQualifiedObjectName().getObjectName(), ignored -> {});
-          while (deviceIterator.hasNext()) {
-            IDeviceID deviceID = deviceIterator.next();
-            if (deviceFilter != null
-                && 
!Boolean.TRUE.equals(deviceFilter.accept(deviceFilterVisitor, deviceID))) {
-              continue;
-            }
-            deviceOffsetMap
-                .computeIfAbsent(deviceID, ignored -> new ArrayList<>())
-                .add(
-                    new ExternalTsFileDeviceOffset(
-                        tsFilePath, 
deviceIterator.getCurrentDeviceMeasurementNodeOffset()));
-          }
-        } catch (IOException e) {
-          throw new SemanticException(
-              "Failed to collect devices from external TsFile: " + tsFilePath);
-        }
-      }
-      List<DeviceEntry> deviceEntries = new 
ArrayList<>(deviceOffsetMap.size());
-      List<List<ExternalTsFileDeviceOffset>> deviceOffsets =
-          new ArrayList<>(deviceOffsetMap.size());
-      for (Map.Entry<IDeviceID, List<ExternalTsFileDeviceOffset>> entry :
-          deviceOffsetMap.entrySet()) {
-        deviceEntries.add(new AlignedDeviceEntry(entry.getKey(), new 
Binary[0]));
-        deviceOffsets.add(entry.getValue());
-      }
-      tableScanNode.setDeviceEntries(deviceEntries);
-      tableScanNode.setDeviceOffsets(deviceOffsets);
+      tableScanNode.setSchemaFilter(
+          constructExternalTsFileDeviceFilter(tableScanNode, 
metadataExpressions));
     }
 
     private SchemaFilter constructExternalTsFileDeviceFilter(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
index b72b05fbff5..d11a012a7d5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
@@ -298,9 +298,12 @@ public class UnaliasSymbolReferences implements 
PlanOptimizer {
               node.getPushDownOffset(),
               node.getTimePredicate().map(mapper::map).orElse(null),
               node.getScanOrder(),
-              node.getTsFilePaths(),
-              node.getDeviceEntries(),
-              node.getDeviceOffsets());
+              node.isPushLimitToEachDevice(),
+              node.getTagAndAttributeIndexMap(),
+              node.getExternalTsFileQueryResource(),
+              node.getDeviceEntryIndexes(),
+              node.getDeviceTaskPartitionIndex(),
+              node.getSchemaFilter());
       rewrittenNode.setRegionReplicaSet(node.getRegionReplicaSet());
       return new PlanAndMappings(rewrittenNode, mapping);
     }

Reply via email to