This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch read_tsfile_table_function
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 342c4e1e17d9b6c60934186eb3ec6083e8358aad
Author: shuwenwei <[email protected]>
AuthorDate: Thu Jun 4 18:38:44 2026 +0800

    impl 2
---
 ...r.java => ExternalTsFileTableScanOperator.java} | 142 +++++--------
 .../UnorderedExternalTsFileTableScanOperator.java  |  71 ++++++-
 .../planner/DataNodeTableOperatorGenerator.java    |  52 +----
 .../plan/planner/plan/node/PlanGraphPrinter.java   |   8 -
 .../distribute/TableDistributedPlanGenerator.java  | 144 ++++++++++++-
 .../iterative/rule/PruneTableScanColumns.java      |   6 +-
 .../planner/node/ExternalTsFileScanNode.java       | 222 +++++++++++----------
 .../optimizations/PushPredicateIntoTableScan.java  |  94 ++++++++-
 .../optimizations/UnaliasSymbolReferences.java     |   6 +-
 .../relational/tvf/ReadTsFileTableFunction.java    |  47 ++---
 10 files changed, 490 insertions(+), 302 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java
similarity index 51%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java
index b1eb554a3ef..0c68ec96c9b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java
@@ -19,46 +19,45 @@
 
 package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
 
-import org.apache.iotdb.commons.schema.filter.SchemaFilter;
+import org.apache.iotdb.commons.path.AlignedFullPath;
+import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.FileLoaderUtils;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
 import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import org.apache.iotdb.db.utils.EncryptDBUtils;
 
-import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.read.TsFileSequenceReader;
-import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata;
 import org.apache.tsfile.utils.RamUsageEstimator;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import static 
org.apache.iotdb.calc.plan.planner.CommonOperatorUtils.CURRENT_DEVICE_INDEX_STRING;
 
-public class UnorderedExternalTsFileTableScanOperator extends 
AbstractTableScanOperator {
+public class ExternalTsFileTableScanOperator extends AbstractTableScanOperator 
{
   private static final long INSTANCE_SIZE =
-      
RamUsageEstimator.shallowSizeOfInstance(UnorderedExternalTsFileTableScanOperator.class);
+      
RamUsageEstimator.shallowSizeOfInstance(ExternalTsFileTableScanOperator.class);
 
   private final String tableName;
-  private final SchemaFilter deviceFilter;
+  private final List<DeviceEntry> deviceEntries;
+  private final List<List<ExternalTsFileDeviceOffset>> deviceOffsets;
 
-  private MultiTsFileResourceIterator deviceIterator;
-  private Map<TsFileResource, TsFileSequenceReader> resourceReaderMap = 
Collections.emptyMap();
   private DeviceEntry currentDeviceEntry;
+  private List<ExternalTsFileDeviceOffset> currentDeviceOffsets;
   private int currentDeviceIndex;
 
-  public UnorderedExternalTsFileTableScanOperator(
-      AbstractTableScanOperatorParameter parameter, String tableName, 
SchemaFilter deviceFilter) {
+  public ExternalTsFileTableScanOperator(
+      AbstractTableScanOperatorParameter parameter,
+      String tableName,
+      List<DeviceEntry> deviceEntries,
+      List<List<ExternalTsFileDeviceOffset>> deviceOffsets) {
     super(parameter);
     this.tableName = tableName;
-    this.deviceFilter = deviceFilter;
+    this.deviceEntries = new ArrayList<>(deviceEntries);
+    this.deviceOffsets = new ArrayList<>(deviceOffsets);
     this.currentDeviceIndex = 0;
   }
 
@@ -76,69 +75,22 @@ public class UnorderedExternalTsFileTableScanOperator 
extends AbstractTableScanO
   @Override
   public void initQueryDataSource(IQueryDataSource dataSource) {
     super.initQueryDataSource(dataSource);
-
-    QueryDataSource queryDataSource = (QueryDataSource) dataSource;
-    initDeviceIterator(queryDataSource);
     currentDeviceEntry = nextDeviceEntry();
     recordCurrentDeviceIndex();
     constructAlignedSeriesScanUtil();
     if (seriesScanUtil != null) {
-      seriesScanUtil.initQueryDataSource(queryDataSource);
+      seriesScanUtil.initQueryDataSource((QueryDataSource) dataSource);
     }
   }
 
-  private void initDeviceIterator(QueryDataSource queryDataSource) {
-    resourceReaderMap = 
createResourceReaderMap(getAllResources(queryDataSource));
-    deviceIterator =
-        new MultiTsFileResourceIterator(
-            tableName,
-            queryDataSource.getSeqResources(),
-            queryDataSource.getUnseqResources(),
-            resourceReaderMap,
-            ((OperatorContext) operatorContext).getInstanceContext(),
-            seriesScanOptions,
-            deviceFilter);
-  }
-
-  private Map<TsFileResource, TsFileSequenceReader> createResourceReaderMap(
-      List<TsFileResource> resources) {
-    Map<TsFileResource, TsFileSequenceReader> readerMap = new 
HashMap<>(resources.size());
-    for (TsFileResource resource : resources) {
-      try {
-        readerMap.put(
-            resource,
-            new TsFileSequenceReader(
-                resource.getTsFilePath(),
-                ((OperatorContext) operatorContext)
-                        .getInstanceContext()
-                        .getQueryStatistics()
-                        .getLoadTimeSeriesMetadataActualIOSize()
-                    ::addAndGet,
-                
EncryptDBUtils.getFirstEncryptParamFromTSFilePath(resource.getTsFilePath())));
-      } catch (IOException e) {
-        closeResourceReaders(readerMap);
-        throw new RuntimeException(
-            "Failed to open external TsFile reader: " + 
resource.getTsFilePath(), e);
-      }
-    }
-    return readerMap;
-  }
-
-  private List<TsFileResource> getAllResources(QueryDataSource 
queryDataSource) {
-    List<TsFileResource> resources =
-        new ArrayList<>(
-            queryDataSource.getSeqResources().size() + 
queryDataSource.getUnseqResources().size());
-    resources.addAll(queryDataSource.getSeqResources());
-    resources.addAll(queryDataSource.getUnseqResources());
-    return resources;
-  }
-
   private DeviceEntry nextDeviceEntry() {
-    if (deviceIterator == null || !deviceIterator.hasNextDevice()) {
+    if (currentDeviceIndex >= deviceEntries.size()) {
+      currentDeviceOffsets = null;
       return null;
     }
-    IDeviceID nextDevice = deviceIterator.nextDevice();
-    return nextDevice == null ? null : new AlignedDeviceEntry(nextDevice, new 
Binary[0]);
+    DeviceEntry deviceEntry = deviceEntries.get(currentDeviceIndex);
+    currentDeviceOffsets = deviceOffsets.get(currentDeviceIndex);
+    return deviceEntry;
   }
 
   @Override
@@ -184,15 +136,38 @@ public class UnorderedExternalTsFileTableScanOperator 
extends AbstractTableScanO
             ((OperatorContext) operatorContext).getInstanceContext(),
             true,
             measurementColumnTSDataTypes,
-            deviceIterator);
+            this::loadTimeSeriesMetadata);
   }
 
-  @Override
-  public void close() throws Exception {
-    closeResourceReaders(resourceReaderMap);
-    resourceReaderMap = Collections.emptyMap();
-    deviceIterator = null;
-    super.close();
+  private AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata(
+      TsFileResource resource, AlignedFullPath alignedPath) throws IOException 
{
+    if (currentDeviceOffsets == null
+        || 
!getCurrentDeviceEntry().getDeviceID().equals(alignedPath.getDeviceId())) {
+      return null;
+    }
+
+    long[] deviceMeasurementNodeOffset = 
getDeviceMeasurementNodeOffset(resource.getTsFilePath());
+    if (deviceMeasurementNodeOffset == null) {
+      return null;
+    }
+    // TODO: Use deviceMeasurementNodeOffset after FileLoaderUtils supports 
offset-based metadata
+    // loading in this branch.
+    return FileLoaderUtils.loadAlignedTimeSeriesMetadata(
+        resource,
+        alignedPath,
+        ((OperatorContext) operatorContext).getInstanceContext(),
+        seriesScanOptions.getGlobalTimeFilter(),
+        resource.isSeq(),
+        ((OperatorContext) 
operatorContext).getInstanceContext().isIgnoreAllNullRows());
+  }
+
+  private long[] getDeviceMeasurementNodeOffset(String tsFilePath) {
+    for (ExternalTsFileDeviceOffset offset : currentDeviceOffsets) {
+      if (tsFilePath.equals(offset.getTsFilePath())) {
+        return offset.getDeviceMeasurementNodeOffset();
+      }
+    }
+    return null;
   }
 
   @Override
@@ -200,16 +175,7 @@ public class UnorderedExternalTsFileTableScanOperator 
extends AbstractTableScanO
     return super.ramBytesUsed()
         + INSTANCE_SIZE
         - AbstractTableScanOperator.INSTANCE_SIZE
-        + RamUsageEstimator.sizeOfMap(resourceReaderMap);
-  }
-
-  private void closeResourceReaders(Map<TsFileResource, TsFileSequenceReader> 
readerMap) {
-    for (TsFileSequenceReader reader : readerMap.values()) {
-      try {
-        reader.close();
-      } catch (IOException ignored) {
-        // ignore close failure
-      }
-    }
+        + RamUsageEstimator.sizeOfCollection(deviceEntries)
+        + RamUsageEstimator.sizeOfCollection(deviceOffsets);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java
index b1eb554a3ef..215fdd66f33 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java
@@ -19,8 +19,11 @@
 
 package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
 
+import org.apache.iotdb.commons.path.AlignedFullPath;
 import org.apache.iotdb.commons.schema.filter.SchemaFilter;
+import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.FileLoaderUtils;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
@@ -28,6 +31,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.utils.EncryptDBUtils;
 
+import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.read.TsFileSequenceReader;
 import org.apache.tsfile.utils.Binary;
@@ -48,17 +52,28 @@ public class UnorderedExternalTsFileTableScanOperator 
extends AbstractTableScanO
 
   private final String tableName;
   private final SchemaFilter deviceFilter;
+  private final List<DeviceEntry> deviceEntries;
+  private final List<List<ExternalTsFileDeviceOffset>> deviceOffsets;
+  private final ExternalTsFileDeviceFilterVisitor deviceFilterVisitor =
+      new ExternalTsFileDeviceFilterVisitor();
 
   private MultiTsFileResourceIterator deviceIterator;
   private Map<TsFileResource, TsFileSequenceReader> resourceReaderMap = 
Collections.emptyMap();
   private DeviceEntry currentDeviceEntry;
+  private List<ExternalTsFileDeviceOffset> currentDeviceOffsets;
   private int currentDeviceIndex;
 
   public UnorderedExternalTsFileTableScanOperator(
-      AbstractTableScanOperatorParameter parameter, String tableName, 
SchemaFilter deviceFilter) {
+      AbstractTableScanOperatorParameter parameter,
+      String tableName,
+      SchemaFilter deviceFilter,
+      List<DeviceEntry> deviceEntries,
+      List<List<ExternalTsFileDeviceOffset>> deviceOffsets) {
     super(parameter);
     this.tableName = tableName;
     this.deviceFilter = deviceFilter;
+    this.deviceEntries = deviceEntries;
+    this.deviceOffsets = deviceOffsets;
     this.currentDeviceIndex = 0;
   }
 
@@ -78,7 +93,9 @@ public class UnorderedExternalTsFileTableScanOperator extends 
AbstractTableScanO
     super.initQueryDataSource(dataSource);
 
     QueryDataSource queryDataSource = (QueryDataSource) dataSource;
-    initDeviceIterator(queryDataSource);
+    if (deviceEntries.isEmpty()) {
+      initDeviceIterator(queryDataSource);
+    }
     currentDeviceEntry = nextDeviceEntry();
     recordCurrentDeviceIndex();
     constructAlignedSeriesScanUtil();
@@ -134,6 +151,19 @@ public class UnorderedExternalTsFileTableScanOperator 
extends AbstractTableScanO
   }
 
   private DeviceEntry nextDeviceEntry() {
+    if (!deviceEntries.isEmpty()) {
+      while (currentDeviceIndex < deviceEntries.size()) {
+        currentDeviceOffsets = deviceOffsets.get(currentDeviceIndex);
+        DeviceEntry deviceEntry = deviceEntries.get(currentDeviceIndex);
+        if (isDeviceMatched(deviceEntry.getDeviceID())) {
+          return deviceEntry;
+        }
+        currentDeviceIndex++;
+      }
+      currentDeviceOffsets = null;
+      return null;
+    }
+
     if (deviceIterator == null || !deviceIterator.hasNextDevice()) {
       return null;
     }
@@ -184,7 +214,42 @@ public class UnorderedExternalTsFileTableScanOperator 
extends AbstractTableScanO
             ((OperatorContext) operatorContext).getInstanceContext(),
             true,
             measurementColumnTSDataTypes,
-            deviceIterator);
+            deviceEntries.isEmpty()
+                ? deviceIterator::loadTimeSeriesMetadata
+                : this::loadTimeSeriesMetadata);
+  }
+
+  private boolean isDeviceMatched(IDeviceID deviceID) {
+    return deviceFilter == null
+        || Boolean.TRUE.equals(deviceFilter.accept(deviceFilterVisitor, 
deviceID));
+  }
+
+  private AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata(
+      TsFileResource resource, AlignedFullPath alignedPath) throws IOException 
{
+    if (currentDeviceOffsets == null
+        || 
!getCurrentDeviceEntry().getDeviceID().equals(alignedPath.getDeviceId())) {
+      return null;
+    }
+    if (!containsCurrentDevice(resource)) {
+      return null;
+    }
+    return FileLoaderUtils.loadAlignedTimeSeriesMetadata(
+        resource,
+        alignedPath,
+        ((OperatorContext) operatorContext).getInstanceContext(),
+        seriesScanOptions.getGlobalTimeFilter(),
+        resource.isSeq(),
+        ((OperatorContext) 
operatorContext).getInstanceContext().isIgnoreAllNullRows());
+  }
+
+  private boolean containsCurrentDevice(TsFileResource resource) {
+    String tsFilePath = resource.getTsFilePath();
+    for (ExternalTsFileDeviceOffset offset : currentDeviceOffsets) {
+      if (tsFilePath.equals(offset.getTsFilePath())) {
+        return true;
+      }
+    }
+    return false;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
index a857e063304..6faca66d7aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
@@ -51,9 +51,7 @@ import 
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.FunctionCall
 import 
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.LongLiteral;
 import 
org.apache.iotdb.commons.queryengine.plan.relational.type.InternalTypeManager;
 import org.apache.iotdb.commons.queryengine.utils.TimestampPrecisionUtils;
-import org.apache.iotdb.commons.schema.filter.SchemaFilter;
 import org.apache.iotdb.commons.schema.table.TsTable;
-import org.apache.iotdb.commons.schema.table.column.TagColumnSchema;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -98,22 +96,20 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.Abst
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.CteScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.DefaultAggTableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.DeviceIteratorScanOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.ExternalTsFileTableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.InformationSchemaTableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.LastQueryAggTableScanOperator;
-import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.OrderedExternalTsFileTableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewAggregationScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeNonAlignedDeviceViewAggregationScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeToTableViewAdaptorOperator;
-import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.UnorderedExternalTsFileTableScanOperator;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.read.CountSchemaMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
 import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.ConvertPredicateToTimeFilterVisitor;
-import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.schema.ConvertSchemaPredicateToFilterVisitor;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.NonAlignedDeviceEntry;
@@ -1132,18 +1128,13 @@ public class DataNodeTableOperatorGenerator
       ExternalTsFileScanNode node, LocalExecutionPlanContext context) {
     AbstractTableScanOperator.AbstractTableScanOperatorParameter parameter =
         constructExternalTsFileTableScanOperatorParameter(node, context);
-    SchemaFilter deviceFilter = constructExternalTsFileDeviceFilter(node);
 
     AbstractTableScanOperator externalTsFileTableScanOperator =
-        node.getPushedOrderingScheme().isPresent()
-            ? new OrderedExternalTsFileTableScanOperator(
-                parameter,
-                node.getQualifiedObjectName().getObjectName(),
-                node.getAssignments(),
-                node.getPushedOrderingScheme().get(),
-                deviceFilter)
-            : new UnorderedExternalTsFileTableScanOperator(
-                parameter, node.getQualifiedObjectName().getObjectName(), 
deviceFilter);
+        new ExternalTsFileTableScanOperator(
+            parameter,
+            node.getQualifiedObjectName().getObjectName(),
+            node.getDeviceEntries(),
+            node.getDeviceOffsets());
 
     
context.getInstanceContext().collectTable(node.getQualifiedObjectName().getObjectName());
 
@@ -1156,31 +1147,6 @@ public class DataNodeTableOperatorGenerator
     return externalTsFileTableScanOperator;
   }
 
-  private SchemaFilter 
constructExternalTsFileDeviceFilter(ExternalTsFileScanNode node) {
-    if (!node.getTagPredicate().isPresent()) {
-      return null;
-    }
-    TsTable table = new TsTable(node.getQualifiedObjectName().getObjectName());
-    for (Map.Entry<Symbol, ColumnSchema> entry : 
node.getAssignments().entrySet()) {
-      ColumnSchema columnSchema = entry.getValue();
-      if (columnSchema.getColumnCategory() == TsTableColumnCategory.TAG) {
-        table.addColumnSchema(
-            new TagColumnSchema(entry.getKey().getName(), 
getTSDataType(columnSchema.getType())));
-      }
-    }
-    SchemaFilter deviceFilter =
-        node.getTagPredicate()
-            .get()
-            .accept(
-                new ConvertSchemaPredicateToFilterVisitor(),
-                new ConvertSchemaPredicateToFilterVisitor.Context(table));
-    if (deviceFilter == null) {
-      throw new UnsupportedOperationException(
-          "Unsupported external TsFile device filter: " + 
node.getTagPredicate().get());
-    }
-    return deviceFilter;
-  }
-
   private AbstractTableScanOperator.AbstractTableScanOperatorParameter
       constructExternalTsFileTableScanOperatorParameter(
           ExternalTsFileScanNode node, LocalExecutionPlanContext context) {
@@ -1202,11 +1168,7 @@ public class DataNodeTableOperatorGenerator
 
     OperatorContext operatorContext =
         addOperatorContext(
-            context,
-            node.getPlanNodeId(),
-            node.getPushedOrderingScheme().isPresent()
-                ? OrderedExternalTsFileTableScanOperator.class.getSimpleName()
-                : 
UnorderedExternalTsFileTableScanOperator.class.getSimpleName());
+            context, node.getPlanNodeId(), 
ExternalTsFileTableScanOperator.class.getSimpleName());
 
     Set<String> allSensors = new 
HashSet<>(commonParameter.measurementColumnNames);
     // for time column
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index 4338fe8ad43..3ab8b283d0b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -684,18 +684,10 @@ public class PlanGraphPrinter implements 
PlanVisitor<List<String>, PlanGraphPrin
     if (externalTsFileScanNode != null) {
       boxValue.add(String.format("ScanOrder: %s", 
externalTsFileScanNode.getScanOrder()));
       boxValue.add(String.format("TsFilePaths: %s", 
externalTsFileScanNode.getTsFilePaths()));
-      externalTsFileScanNode
-          .getPushedOrderingScheme()
-          .ifPresent(
-              orderingScheme ->
-                  boxValue.add(String.format("PushedOrderingScheme: %s", 
orderingScheme)));
       externalTsFileScanNode
           .getTimePredicate()
           .ifPresent(
               timePredicate -> boxValue.add(String.format("TimePredicate: %s", 
timePredicate)));
-      externalTsFileScanNode
-          .getTagPredicate()
-          .ifPresent(tagPredicate -> boxValue.add(String.format("TagPredicate: 
%s", tagPredicate)));
     }
 
     if (node.getPushDownPredicate() != null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 6c54af74445..d6cc700a14f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -73,6 +73,7 @@ import 
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.SymbolRefere
 import org.apache.iotdb.commons.schema.table.InformationSchema;
 import org.apache.iotdb.commons.schema.table.TsTable;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -731,16 +732,74 @@ public class TableDistributedPlanGenerator
   @Override
   public List<PlanNode> visitExternalTsFileScan(
       final ExternalTsFileScanNode node, final PlanContext context) {
-    node.setRegionReplicaSet(
-        new TRegionReplicaSet(
-            null, 
ImmutableList.of(DataNodeEndPoints.getLocalDataNodeLocation())));
+    TRegionReplicaSet localRegionReplicaSet =
+        new TRegionReplicaSet(null, 
ImmutableList.of(DataNodeEndPoints.getLocalDataNodeLocation()));
+    node.setRegionReplicaSet(localRegionReplicaSet);
     context.mostUsedRegion = node.getRegionReplicaSet();
     if (context.hasSortProperty) {
       processExternalTsFileSortProperty(node, context);
+      return Collections.singletonList(node);
+    }
+
+    List<PlanNode> splitNodes = splitExternalTsFileScanByDeviceEntries(node, 
localRegionReplicaSet);
+    if (!splitNodes.isEmpty()) {
+      return splitNodes;
     }
     return Collections.singletonList(node);
   }
 
+  private List<PlanNode> splitExternalTsFileScanByDeviceEntries(
+      final ExternalTsFileScanNode node, final TRegionReplicaSet 
localRegionReplicaSet) {
+    List<DeviceEntry> deviceEntries = node.getDeviceEntries();
+    if (deviceEntries.size() <= 1) {
+      return Collections.emptyList();
+    }
+
+    int splitCount =
+        Math.min(
+            deviceEntries.size(),
+            
IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism());
+    if (splitCount <= 1) {
+      return Collections.emptyList();
+    }
+
+    List<List<DeviceEntry>> splitDeviceEntries = new ArrayList<>(splitCount);
+    List<List<List<ExternalTsFileDeviceOffset>>> splitDeviceOffsets = new 
ArrayList<>(splitCount);
+    for (int i = 0; i < splitCount; i++) {
+      splitDeviceEntries.add(new ArrayList<>());
+      splitDeviceOffsets.add(new ArrayList<>());
+    }
+    for (int i = 0; i < deviceEntries.size(); i++) {
+      splitDeviceEntries.get(i % splitCount).add(deviceEntries.get(i));
+      splitDeviceOffsets.get(i % 
splitCount).add(node.getDeviceOffsets().get(i));
+    }
+
+    List<PlanNode> result = new ArrayList<>(splitCount);
+    for (int i = 0; i < splitDeviceEntries.size(); i++) {
+      List<DeviceEntry> entries = splitDeviceEntries.get(i);
+      if (entries.isEmpty()) {
+        continue;
+      }
+      ExternalTsFileScanNode splitNode =
+          new ExternalTsFileScanNode(
+              queryId.genPlanNodeId(),
+              node.getQualifiedObjectName(),
+              node.getOutputSymbols(),
+              node.getAssignments(),
+              node.getPushDownPredicate(),
+              node.getPushDownLimit(),
+              node.getPushDownOffset(),
+              node.getTimePredicate().orElse(null),
+              node.getScanOrder(),
+              node.getTsFilePaths(),
+              entries,
+              splitDeviceOffsets.get(i));
+      splitNode.setRegionReplicaSet(localRegionReplicaSet);
+      result.add(splitNode);
+    }
+    return result;
+  }
+
   private List<PlanNode> constructDeviceTableScanByTags(
       final DeviceTableScanNode node, final PlanContext context) {
     DataPartition dataPartition = analysis.getDataPartitionInfo();
@@ -1969,13 +2028,32 @@ public class TableDistributedPlanGenerator
       return;
     }
 
-    OrderingScheme pushedOrderingScheme =
+    OrderingScheme orderingScheme =
         new OrderingScheme(
             newOrderingSymbols,
             IntStream.range(0, newOrderingSymbols.size())
                 .boxed()
                 .collect(Collectors.toMap(newOrderingSymbols::get, 
newSortOrders::get)));
-    externalTsFileScanNode.setPushedOrderingScheme(pushedOrderingScheme);
+
+    Comparator<DeviceEntry> comparator =
+        createExternalTsFileDeviceEntryComparator(
+            externalTsFileScanNode, newOrderingSymbols, newSortOrders);
+    if (comparator != null) {
+      Map<IDeviceID, List<ExternalTsFileDeviceOffset>> offsetMap = new 
HashMap<>();
+      List<DeviceEntry> deviceEntries = 
externalTsFileScanNode.getDeviceEntries();
+      List<List<ExternalTsFileDeviceOffset>> deviceOffsets =
+          externalTsFileScanNode.getDeviceOffsets();
+      for (int i = 0; i < deviceEntries.size(); i++) {
+        offsetMap.put(deviceEntries.get(i).getDeviceID(), 
deviceOffsets.get(i));
+      }
+      externalTsFileScanNode.getDeviceEntries().sort(comparator);
+      List<List<ExternalTsFileDeviceOffset>> sortedDeviceOffsets =
+          new ArrayList<>(externalTsFileScanNode.getDeviceEntries().size());
+      for (DeviceEntry deviceEntry : 
externalTsFileScanNode.getDeviceEntries()) {
+        sortedDeviceOffsets.add(offsetMap.get(deviceEntry.getDeviceID()));
+      }
+      externalTsFileScanNode.setDeviceOffsets(sortedDeviceOffsets);
+    }
 
     if (lastIsTimeRelated) {
       if (newOrderingSymbols.size() > 1
@@ -1989,14 +2067,66 @@ public class TableDistributedPlanGenerator
                       .boxed()
                       .collect(Collectors.toMap(newOrderingSymbols::get, 
newSortOrders::get))),
               newOrderingSymbols.size() - 2)) {
-        nodeOrderingMap.put(externalTsFileScanNode.getPlanNodeId(), 
pushedOrderingScheme);
+        nodeOrderingMap.put(externalTsFileScanNode.getPlanNodeId(), 
orderingScheme);
       }
       return;
     }
 
     if (newOrderingSymbols.size() == 
expectedOrderingScheme.getOrderBy().size()) {
-      nodeOrderingMap.put(externalTsFileScanNode.getPlanNodeId(), 
pushedOrderingScheme);
+      nodeOrderingMap.put(externalTsFileScanNode.getPlanNodeId(), 
orderingScheme);
+    }
+  }
+
+  private Comparator<DeviceEntry> createExternalTsFileDeviceEntryComparator(
+      ExternalTsFileScanNode externalTsFileScanNode,
+      List<Symbol> orderingSymbols,
+      List<SortOrder> sortOrders) {
+    Comparator<DeviceEntry> comparator = null;
+    for (int i = 0; i < orderingSymbols.size(); i++) {
+      Symbol symbol = orderingSymbols.get(i);
+      if (externalTsFileScanNode.isTimeColumn(symbol)) {
+        break;
+      }
+      int tagIndex = getExternalTsFileTagIndex(externalTsFileScanNode, symbol);
+      final int deviceSegmentIndex = tagIndex + 1;
+      SortOrder sortOrder = sortOrders.get(i);
+      Comparator<String> valueComparator =
+          sortOrder.isNullsFirst()
+              ? Comparator.nullsFirst(Comparator.naturalOrder())
+              : Comparator.nullsLast(Comparator.naturalOrder());
+      Comparator<DeviceEntry> currentComparator =
+          Comparator.comparing(
+              deviceEntry -> getDeviceSegment(deviceEntry.getDeviceID(), 
deviceSegmentIndex),
+              valueComparator);
+      if (!sortOrder.isAscending()) {
+        currentComparator = currentComparator.reversed();
+      }
+      comparator =
+          comparator == null ? currentComparator : 
comparator.thenComparing(currentComparator);
     }
+    return comparator == null ? null : 
comparator.thenComparing(DeviceEntry::getDeviceID);
+  }
+
+  private int getExternalTsFileTagIndex(
+      ExternalTsFileScanNode externalTsFileScanNode, Symbol symbol) {
+    int tagIndex = 0;
+    for (Map.Entry<Symbol, ColumnSchema> entry :
+        externalTsFileScanNode.getAssignments().entrySet()) {
+      if (entry.getValue().getColumnCategory() != TsTableColumnCategory.TAG) {
+        continue;
+      }
+      if (entry.getKey().equals(symbol)) {
+        return tagIndex;
+      }
+      tagIndex++;
+    }
+    throw new IllegalArgumentException("Unexpected external TsFile ordering 
symbol: " + symbol);
+  }
+
+  private String getDeviceSegment(IDeviceID deviceID, int deviceSegmentIndex) {
+    return deviceSegmentIndex < deviceID.segmentNum()
+        ? (String) deviceID.segment(deviceSegmentIndex)
+        : null;
   }
 
   private Optional<IDeviceID.TreeDeviceIdColumnValueExtractor>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java
index 5a7ab6eadfa..42c1c8025a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java
@@ -188,11 +188,11 @@ public class PruneTableScanColumns extends 
ProjectOffPushDownRule<TableScanNode>
               externalTsFileScanNode.getPushDownPredicate(),
               externalTsFileScanNode.getPushDownLimit(),
               externalTsFileScanNode.getPushDownOffset(),
-              externalTsFileScanNode.getTagPredicate().orElse(null),
               externalTsFileScanNode.getTimePredicate().orElse(null),
               externalTsFileScanNode.getScanOrder(),
-              externalTsFileScanNode.getPushedOrderingScheme().orElse(null),
-              externalTsFileScanNode.getTsFilePaths()));
+              externalTsFileScanNode.getTsFilePaths(),
+              externalTsFileScanNode.getDeviceEntries(),
+              externalTsFileScanNode.getDeviceOffsets()));
     } else if (node instanceof InformationSchemaTableScanNode) {
       // For the convenience of process in execution stage, column-prune for
       // InformationSchemaTableScanNode is
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java
index e60cebfdf29..71184f21eed 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java
@@ -21,18 +21,16 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.node;
 
 import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.IPlanVisitor;
 import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
-import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
 import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.TableScanNode;
 import 
org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema;
 import 
org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName;
-import 
org.apache.iotdb.commons.queryengine.plan.relational.planner.OrderingScheme;
 import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol;
 import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression;
+import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 
-import org.apache.tsfile.utils.ReadWriteIOUtils;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -44,10 +42,10 @@ import java.util.Optional;
 
 public class ExternalTsFileScanNode extends TableScanNode {
   private List<String> tsFilePaths;
-  private Expression tagPredicate;
   private Expression timePredicate;
   private Ordering scanOrder = Ordering.ASC;
-  private OrderingScheme pushedOrderingScheme;
+  private List<DeviceEntry> deviceEntries = Collections.emptyList();
+  private List<List<ExternalTsFileDeviceOffset>> deviceOffsets = 
Collections.emptyList();
 
   protected ExternalTsFileScanNode() {}
 
@@ -57,8 +55,28 @@ public class ExternalTsFileScanNode extends TableScanNode {
       List<Symbol> outputSymbols,
       Map<Symbol, ColumnSchema> assignments,
       List<String> tsFilePaths) {
+    this(
+        id,
+        qualifiedObjectName,
+        outputSymbols,
+        assignments,
+        tsFilePaths,
+        Collections.emptyList(),
+        Collections.emptyList());
+  }
+
+  public ExternalTsFileScanNode(
+      PlanNodeId id,
+      QualifiedObjectName qualifiedObjectName,
+      List<Symbol> outputSymbols,
+      Map<Symbol, ColumnSchema> assignments,
+      List<String> tsFilePaths,
+      List<DeviceEntry> deviceEntries,
+      List<List<ExternalTsFileDeviceOffset>> deviceOffsets) {
     super(id, qualifiedObjectName, outputSymbols, assignments);
     this.tsFilePaths = Collections.unmodifiableList(new 
ArrayList<>(tsFilePaths));
+    this.deviceEntries = new ArrayList<>(deviceEntries);
+    this.deviceOffsets = copyDeviceOffsets(deviceOffsets);
   }
 
   public ExternalTsFileScanNode(
@@ -70,7 +88,6 @@ public class ExternalTsFileScanNode extends TableScanNode {
       long pushDownLimit,
       long pushDownOffset,
       Ordering scanOrder,
-      OrderingScheme pushedOrderingScheme,
       List<String> tsFilePaths) {
     this(
         id,
@@ -80,11 +97,35 @@ public class ExternalTsFileScanNode extends TableScanNode {
         pushDownPredicate,
         pushDownLimit,
         pushDownOffset,
-        null,
+        scanOrder,
+        tsFilePaths,
+        Collections.emptyList());
+  }
+
+  public ExternalTsFileScanNode(
+      PlanNodeId id,
+      QualifiedObjectName qualifiedObjectName,
+      List<Symbol> outputSymbols,
+      Map<Symbol, ColumnSchema> assignments,
+      Expression pushDownPredicate,
+      long pushDownLimit,
+      long pushDownOffset,
+      Ordering scanOrder,
+      List<String> tsFilePaths,
+      List<DeviceEntry> deviceEntries) {
+    this(
+        id,
+        qualifiedObjectName,
+        outputSymbols,
+        assignments,
+        pushDownPredicate,
+        pushDownLimit,
+        pushDownOffset,
         null,
         scanOrder,
-        pushedOrderingScheme,
-        tsFilePaths);
+        tsFilePaths,
+        deviceEntries,
+        Collections.emptyList());
   }
 
   public ExternalTsFileScanNode(
@@ -95,11 +136,37 @@ public class ExternalTsFileScanNode extends TableScanNode {
       Expression pushDownPredicate,
       long pushDownLimit,
       long pushDownOffset,
-      Expression tagPredicate,
       Expression timePredicate,
       Ordering scanOrder,
-      OrderingScheme pushedOrderingScheme,
       List<String> tsFilePaths) {
+    this(
+        id,
+        qualifiedObjectName,
+        outputSymbols,
+        assignments,
+        pushDownPredicate,
+        pushDownLimit,
+        pushDownOffset,
+        timePredicate,
+        scanOrder,
+        tsFilePaths,
+        Collections.emptyList(),
+        Collections.emptyList());
+  }
+
+  public ExternalTsFileScanNode(
+      PlanNodeId id,
+      QualifiedObjectName qualifiedObjectName,
+      List<Symbol> outputSymbols,
+      Map<Symbol, ColumnSchema> assignments,
+      Expression pushDownPredicate,
+      long pushDownLimit,
+      long pushDownOffset,
+      Expression timePredicate,
+      Ordering scanOrder,
+      List<String> tsFilePaths,
+      List<DeviceEntry> deviceEntries,
+      List<List<ExternalTsFileDeviceOffset>> deviceOffsets) {
     super(
         id,
         qualifiedObjectName,
@@ -108,11 +175,11 @@ public class ExternalTsFileScanNode extends TableScanNode 
{
         pushDownPredicate,
         pushDownLimit,
         pushDownOffset);
-    this.tagPredicate = tagPredicate;
     this.timePredicate = timePredicate;
     this.scanOrder = scanOrder;
-    this.pushedOrderingScheme = pushedOrderingScheme;
     this.tsFilePaths = Collections.unmodifiableList(new 
ArrayList<>(tsFilePaths));
+    this.deviceEntries = new ArrayList<>(deviceEntries);
+    this.deviceOffsets = copyDeviceOffsets(deviceOffsets);
   }
 
   @Override
@@ -130,23 +197,41 @@ public class ExternalTsFileScanNode extends TableScanNode 
{
         pushDownPredicate,
         pushDownLimit,
         pushDownOffset,
-        tagPredicate,
         timePredicate,
         scanOrder,
-        pushedOrderingScheme,
-        tsFilePaths);
+        tsFilePaths,
+        deviceEntries,
+        deviceOffsets);
   }
 
   public List<String> getTsFilePaths() {
     return tsFilePaths;
   }
 
-  public Optional<Expression> getTagPredicate() {
-    return Optional.ofNullable(tagPredicate);
+  public List<DeviceEntry> getDeviceEntries() {
+    return deviceEntries;
   }
 
-  public void setTagPredicate(Expression tagPredicate) {
-    this.tagPredicate = tagPredicate;
+  public void setDeviceEntries(List<DeviceEntry> deviceEntries) {
+    this.deviceEntries = new ArrayList<>(deviceEntries);
+  }
+
+  public List<List<ExternalTsFileDeviceOffset>> getDeviceOffsets() {
+    return deviceOffsets;
+  }
+
+  public void setDeviceOffsets(List<List<ExternalTsFileDeviceOffset>> 
deviceOffsets) {
+    this.deviceOffsets = copyDeviceOffsets(deviceOffsets);
+  }
+
+  private static List<List<ExternalTsFileDeviceOffset>> copyDeviceOffsets(
+      List<List<ExternalTsFileDeviceOffset>> deviceOffsets) {
+    List<List<ExternalTsFileDeviceOffset>> copiedDeviceOffsets =
+        new ArrayList<>(deviceOffsets.size());
+    for (List<ExternalTsFileDeviceOffset> offsets : deviceOffsets) {
+      copiedDeviceOffsets.add(new ArrayList<>(offsets));
+    }
+    return copiedDeviceOffsets;
   }
 
   public Optional<Expression> getTimePredicate() {
@@ -165,104 +250,21 @@ public class ExternalTsFileScanNode extends 
TableScanNode {
     this.scanOrder = scanOrder;
   }
 
-  public Optional<OrderingScheme> getPushedOrderingScheme() {
-    return Optional.ofNullable(pushedOrderingScheme);
-  }
-
-  public void setPushedOrderingScheme(OrderingScheme pushedOrderingScheme) {
-    this.pushedOrderingScheme = pushedOrderingScheme;
-  }
-
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
-    PlanNodeType.EXTERNAL_TSFILE_SCAN_NODE.serialize(byteBuffer);
-    TableScanNode.serializeMemberVariables(this, byteBuffer, true);
-    serializePredicate(tagPredicate, byteBuffer);
-    serializePredicate(timePredicate, byteBuffer);
-    ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
-    if (pushedOrderingScheme == null) {
-      ReadWriteIOUtils.write(false, byteBuffer);
-    } else {
-      ReadWriteIOUtils.write(true, byteBuffer);
-      pushedOrderingScheme.serialize(byteBuffer);
-    }
-    serializeTsFilePaths(byteBuffer);
+    throw new UnsupportedOperationException(
+        "ExternalTsFileScanNode cannot be serialized because it reads local 
external TsFiles");
   }
 
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
-    PlanNodeType.EXTERNAL_TSFILE_SCAN_NODE.serialize(stream);
-    TableScanNode.serializeMemberVariables(this, stream, true);
-    serializePredicate(tagPredicate, stream);
-    serializePredicate(timePredicate, stream);
-    ReadWriteIOUtils.write(scanOrder.ordinal(), stream);
-    if (pushedOrderingScheme == null) {
-      ReadWriteIOUtils.write(false, stream);
-    } else {
-      ReadWriteIOUtils.write(true, stream);
-      pushedOrderingScheme.serialize(stream);
-    }
-    serializeTsFilePaths(stream);
-  }
-
-  private void serializeTsFilePaths(ByteBuffer byteBuffer) {
-    ReadWriteIOUtils.write(tsFilePaths.size(), byteBuffer);
-    for (String tsFilePath : tsFilePaths) {
-      ReadWriteIOUtils.write(tsFilePath, byteBuffer);
-    }
-  }
-
-  private void serializeTsFilePaths(DataOutputStream stream) throws 
IOException {
-    ReadWriteIOUtils.write(tsFilePaths.size(), stream);
-    for (String tsFilePath : tsFilePaths) {
-      ReadWriteIOUtils.write(tsFilePath, stream);
-    }
-  }
-
-  private void serializePredicate(Expression predicate, ByteBuffer byteBuffer) 
{
-    if (predicate == null) {
-      ReadWriteIOUtils.write(false, byteBuffer);
-    } else {
-      ReadWriteIOUtils.write(true, byteBuffer);
-      Expression.serialize(predicate, byteBuffer);
-    }
-  }
-
-  private void serializePredicate(Expression predicate, DataOutputStream 
stream)
-      throws IOException {
-    if (predicate == null) {
-      ReadWriteIOUtils.write(false, stream);
-    } else {
-      ReadWriteIOUtils.write(true, stream);
-      Expression.serialize(predicate, stream);
-    }
+    throw new UnsupportedOperationException(
+        "ExternalTsFileScanNode cannot be serialized because it reads local 
external TsFiles");
   }
 
   public static ExternalTsFileScanNode deserialize(ByteBuffer byteBuffer) {
-    ExternalTsFileScanNode node = new ExternalTsFileScanNode();
-    TableScanNode.deserializeMemberVariables(byteBuffer, node, true);
-
-    if (ReadWriteIOUtils.readBool(byteBuffer)) {
-      node.tagPredicate = Expression.deserialize(byteBuffer);
-    }
-    if (ReadWriteIOUtils.readBool(byteBuffer)) {
-      node.timePredicate = Expression.deserialize(byteBuffer);
-    }
-
-    node.scanOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
-    if (ReadWriteIOUtils.readBool(byteBuffer)) {
-      node.pushedOrderingScheme = OrderingScheme.deserialize(byteBuffer);
-    }
-
-    int size = ReadWriteIOUtils.readInt(byteBuffer);
-    List<String> tsFilePaths = new ArrayList<>(size);
-    while (size-- > 0) {
-      tsFilePaths.add(ReadWriteIOUtils.readString(byteBuffer));
-    }
-    node.tsFilePaths = Collections.unmodifiableList(tsFilePaths);
-
-    node.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
-    return node;
+    throw new UnsupportedOperationException(
+        "ExternalTsFileScanNode cannot be deserialized because it reads local 
external TsFiles");
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index 361e2d540bb..05df24c0cd6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@ -47,12 +47,18 @@ import 
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Node;
 import 
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.NullLiteral;
 import 
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.StringLiteral;
 import 
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.SymbolReference;
+import 
org.apache.iotdb.commons.queryengine.plan.relational.type.InternalTypeManager;
 import org.apache.iotdb.commons.queryengine.utils.TimestampPrecisionUtils;
+import org.apache.iotdb.commons.schema.filter.SchemaFilter;
 import org.apache.iotdb.commons.schema.table.InformationSchema;
+import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.TagColumnSchema;
+import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.QueryId;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.ExternalTsFileDeviceFilterVisitor;
 import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
@@ -61,6 +67,8 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
 import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.ConvertPredicateToTimeFilterVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.PredicateCombineIntoTableScanChecker;
 import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.PredicatePushIntoMetadataChecker;
+import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.schema.ConvertSchemaPredicateToFilterVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.NonAlignedDeviceEntry;
@@ -79,18 +87,24 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceVi
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.LazyTsFileDeviceIterator;
+import org.apache.tsfile.read.TsFileSequenceReader;
 import org.apache.tsfile.read.common.type.Type;
 import org.apache.tsfile.read.filter.basic.Filter;
+import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.Pair;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -118,6 +132,7 @@ import static 
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.STATE_
 import static 
org.apache.iotdb.commons.schema.table.InformationSchema.CURRENT_QUERIES;
 import static 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.ATTRIBUTE;
 import static 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.FIELD;
+import static 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TAG;
 import static 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TIME;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.PARTITION_FETCHER;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.SCHEMA_FETCHER;
@@ -476,6 +491,7 @@ public class PushPredicateIntoTableScan implements 
PlanOptimizer {
     public PlanNode visitExternalTsFileScan(
         ExternalTsFileScanNode tableScanNode, RewriteContext context) {
       if (TRUE_LITERAL.equals(context.inheritedPredicate)) {
+        collectExternalTsFileDeviceTasks(tableScanNode, 
Collections.emptyList());
         return tableScanNode;
       }
 
@@ -524,11 +540,8 @@ public class PushPredicateIntoTableScan implements 
PlanOptimizer {
         getDeviceEntriesWithDataPartitions(
             (DeviceTableScanNode) tableScanNode, 
splitExpression.getMetadataExpressions());
       } else if (tableScanNode instanceof ExternalTsFileScanNode) {
-        ((ExternalTsFileScanNode) tableScanNode)
-            .setTagPredicate(
-                splitExpression.getMetadataExpressions().isEmpty()
-                    ? null
-                    : 
combineConjuncts(splitExpression.getMetadataExpressions()));
+        collectExternalTsFileDeviceTasks(
+            (ExternalTsFileScanNode) tableScanNode, 
splitExpression.getMetadataExpressions());
       }
 
       // exist expressions can not push down to scan operator
@@ -545,6 +558,77 @@ public class PushPredicateIntoTableScan implements 
PlanOptimizer {
       return tableScanNode;
     }
 
+    private void collectExternalTsFileDeviceTasks(
+        ExternalTsFileScanNode tableScanNode, List<Expression> 
metadataExpressions) {
+      SchemaFilter deviceFilter =
+          constructExternalTsFileDeviceFilter(tableScanNode, 
metadataExpressions);
+      ExternalTsFileDeviceFilterVisitor deviceFilterVisitor =
+          new ExternalTsFileDeviceFilterVisitor();
+      Map<IDeviceID, List<ExternalTsFileDeviceOffset>> deviceOffsetMap = new 
LinkedHashMap<>();
+      for (String tsFilePath : tableScanNode.getTsFilePaths()) {
+        try (TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFilePath)) {
+          LazyTsFileDeviceIterator deviceIterator =
+              new LazyTsFileDeviceIterator(
+                  reader, 
tableScanNode.getQualifiedObjectName().getObjectName(), ignored -> {});
+          while (deviceIterator.hasNext()) {
+            IDeviceID deviceID = deviceIterator.next();
+            if (deviceFilter != null
+                && 
!Boolean.TRUE.equals(deviceFilter.accept(deviceFilterVisitor, deviceID))) {
+              continue;
+            }
+            deviceOffsetMap
+                .computeIfAbsent(deviceID, ignored -> new ArrayList<>())
+                .add(
+                    new ExternalTsFileDeviceOffset(
+                        tsFilePath, 
deviceIterator.getCurrentDeviceMeasurementNodeOffset()));
+          }
+        } catch (IOException e) {
+          throw new SemanticException(
+              "Failed to collect devices from external TsFile: " + tsFilePath);
+        }
+      }
+      List<DeviceEntry> deviceEntries = new 
ArrayList<>(deviceOffsetMap.size());
+      List<List<ExternalTsFileDeviceOffset>> deviceOffsets =
+          new ArrayList<>(deviceOffsetMap.size());
+      for (Map.Entry<IDeviceID, List<ExternalTsFileDeviceOffset>> entry :
+          deviceOffsetMap.entrySet()) {
+        deviceEntries.add(new AlignedDeviceEntry(entry.getKey(), new 
Binary[0]));
+        deviceOffsets.add(entry.getValue());
+      }
+      tableScanNode.setDeviceEntries(deviceEntries);
+      tableScanNode.setDeviceOffsets(deviceOffsets);
+    }
+
+    private SchemaFilter constructExternalTsFileDeviceFilter(
+        ExternalTsFileScanNode tableScanNode, List<Expression> 
metadataExpressions) {
+      if (metadataExpressions.isEmpty()) {
+        return null;
+      }
+      TsTable table = new 
TsTable(tableScanNode.getQualifiedObjectName().getObjectName());
+      for (Map.Entry<Symbol, ColumnSchema> entry : 
tableScanNode.getAssignments().entrySet()) {
+        ColumnSchema columnSchema = entry.getValue();
+        if (TAG.equals(columnSchema.getColumnCategory())) {
+          table.addColumnSchema(
+              new TagColumnSchema(
+                  entry.getKey().getName(),
+                  InternalTypeManager.getTSDataType(columnSchema.getType())));
+        }
+      }
+      Expression predicate =
+          metadataExpressions.size() == 1
+              ? metadataExpressions.get(0)
+              : new LogicalExpression(LogicalExpression.Operator.AND, 
metadataExpressions);
+      SchemaFilter deviceFilter =
+          predicate.accept(
+              new ConvertSchemaPredicateToFilterVisitor(),
+              new ConvertSchemaPredicateToFilterVisitor.Context(table));
+      if (deviceFilter == null) {
+        throw new UnsupportedOperationException(
+            "Unsupported external TsFile device filter: " + predicate);
+      }
+      return deviceFilter;
+    }
+
     interface InformationSchemaTablePredicatePushDownChecker {
       boolean canPushDown(Expression expression);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
index 389a91f300b..c2f98dd0119 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
@@ -296,11 +296,11 @@ public class UnaliasSymbolReferences implements 
PlanOptimizer {
               node.getPushDownPredicate() == null ? null : 
mapper.map(node.getPushDownPredicate()),
               node.getPushDownLimit(),
               node.getPushDownOffset(),
-              node.getTagPredicate().map(mapper::map).orElse(null),
               node.getTimePredicate().map(mapper::map).orElse(null),
               node.getScanOrder(),
-              node.getPushedOrderingScheme().map(mapper::map).orElse(null),
-              node.getTsFilePaths()),
+              node.getTsFilePaths(),
+              node.getDeviceEntries(),
+              node.getDeviceOffsets()),
           mapping);
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/ReadTsFileTableFunction.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/ReadTsFileTableFunction.java
index 82c4133b339..d2f92f6e287 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/ReadTsFileTableFunction.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/ReadTsFileTableFunction.java
@@ -30,12 +30,10 @@ import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
 import org.apache.iotdb.udf.api.relational.table.argument.Argument;
 import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
 import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
-import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionLeafProcessor;
 import 
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
 import 
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
 import org.apache.iotdb.udf.api.type.Type;
 
-import org.apache.tsfile.block.column.ColumnBuilder;
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.enums.ColumnCategory;
 import org.apache.tsfile.enums.TSDataType;
@@ -116,13 +114,8 @@ public class ReadTsFileTableFunction implements 
TableFunction {
   @Override
   public TableFunctionProcessorProvider getProcessorProvider(
       TableFunctionHandle tableFunctionHandle) {
-    ReadTsFileTableFunctionHandle handle = (ReadTsFileTableFunctionHandle) 
tableFunctionHandle;
-    return new TableFunctionProcessorProvider() {
-      @Override
-      public TableFunctionLeafProcessor getSplitProcessor() {
-        return new ReadTsFileLeafProcessor(handle);
-      }
-    };
+    throw new UnsupportedOperationException(
+        "read_tsfile must be planned as an ExternalTsFileScanNode");
   }
 
   private static String getRequiredStringArgument(Map<String, Argument> 
arguments, String name) {
@@ -321,33 +314,27 @@ public class ReadTsFileTableFunction implements 
TableFunction {
     return builder.build();
   }
 
-  private static class ReadTsFileLeafProcessor implements 
TableFunctionLeafProcessor {
-    private final ReadTsFileTableFunctionHandle handle;
-    private boolean finished;
-
-    private ReadTsFileLeafProcessor(ReadTsFileTableFunctionHandle handle) {
-      this.handle = handle;
-    }
+  public static class ExternalTsFileDeviceOffset {
 
-    @Override
-    public void beforeStart() {
-      // TODO: Open TsFile resources here.
-      finished = true;
-    }
+    private final String tsFilePath;
+    private final long[] deviceMeasurementNodeOffset;
 
-    @Override
-    public void process(List<ColumnBuilder> columnBuilders) {
-      // TODO: Read one batch from TsFile resources and write values into 
column builders.
+    public ExternalTsFileDeviceOffset(String tsFilePath, long[] 
deviceMeasurementNodeOffset) {
+      this.tsFilePath = tsFilePath;
+      this.deviceMeasurementNodeOffset =
+          deviceMeasurementNodeOffset == null
+              ? null
+              : Arrays.copyOf(deviceMeasurementNodeOffset, 
deviceMeasurementNodeOffset.length);
     }
 
-    @Override
-    public boolean isFinish() {
-      return finished;
+    public String getTsFilePath() {
+      return tsFilePath;
     }
 
-    @Override
-    public void beforeDestroy() {
-      // TODO: Close TsFile resources here.
+    public long[] getDeviceMeasurementNodeOffset() {
+      return deviceMeasurementNodeOffset == null
+          ? null
+          : Arrays.copyOf(deviceMeasurementNodeOffset, 
deviceMeasurementNodeOffset.length);
     }
   }
 

Reply via email to