This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch read_tsfile_table_function
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4be16d3042a8f65a267a8f26ead6ffeb59989c72
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jun 5 17:16:47 2026 +0800

    agg scan
---
 .../relational/AbstractAggTableScanOperator.java   |   2 +-
 .../AbstractDefaultAggTableScanOperator.java       |   2 +-
 ...ava => ExternalTsFileAggTableScanOperator.java} |  62 +++----
 .../relational/ExternalTsFileSeriesScanUtil.java   |  42 +++++
 .../ExternalTsFileTableScanOperator.java           |  31 +---
 .../planner/DataNodeTableOperatorGenerator.java    |  29 ++++
 .../plan/planner/plan/node/PlanVisitor.java        |   5 +
 .../distribute/TableDistributedPlanGenerator.java  |  77 +++++++++
 .../planner/node/AggregationTableScanNode.java     |  50 ++++++
 .../node/ExternalTsFileAggregationScanNode.java    | 189 +++++++++++++++++++++
 .../PushAggregationIntoTableScan.java              |  29 +++-
 11 files changed, 443 insertions(+), 75 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
index 1f37ce9f104..e3030c0752c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
@@ -96,7 +96,7 @@ public abstract class AbstractAggTableScanOperator extends 
AbstractDataSourceOpe
 
   protected SeriesScanOptions seriesScanOptions;
   private final boolean ascending;
-  private final Ordering scanOrder;
+  protected final Ordering scanOrder;
   // Some special data types(like BLOB) cannot use statistics
   protected final boolean canUseStatistics;
   private final long cachedRawDataSize;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDefaultAggTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDefaultAggTableScanOperator.java
index f00b0049ef6..797d4491e41 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDefaultAggTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDefaultAggTableScanOperator.java
@@ -34,7 +34,7 @@ import static 
org.apache.iotdb.calc.plan.planner.CommonOperatorUtils.TIME_COLUMN
 
 public abstract class AbstractDefaultAggTableScanOperator extends 
AbstractAggTableScanOperator {
 
-  private static final long INSTANCE_SIZE =
+  protected static final long INSTANCE_SIZE =
       
RamUsageEstimator.shallowSizeOfInstance(AbstractDefaultAggTableScanOperator.class);
 
   protected 
AbstractDefaultAggTableScanOperator(AbstractAggTableScanOperatorParameter 
parameter) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java
similarity index 66%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java
index daccb199d28..79a101a6a79 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java
@@ -22,28 +22,30 @@ package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational;
 import org.apache.iotdb.commons.path.AlignedFullPath;
 import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import 
org.apache.iotdb.db.queryengine.execution.operator.source.FileLoaderUtils;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanUtil;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
 import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata;
+import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.RamUsageEstimator;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-public class ExternalTsFileTableScanOperator extends TableScanOperator {
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
+
+public class ExternalTsFileAggTableScanOperator extends 
DefaultAggTableScanOperator {
   private static final long INSTANCE_SIZE =
-      
RamUsageEstimator.shallowSizeOfInstance(ExternalTsFileTableScanOperator.class);
-  private static final long ABSTRACT_DEVICE_TABLE_SCAN_OPERATOR_INSTANCE_SIZE =
-      
RamUsageEstimator.shallowSizeOfInstance(AbstractDeviceTableScanOperator.class);
+      
RamUsageEstimator.shallowSizeOfInstance(ExternalTsFileAggTableScanOperator.class);
 
   private final String tableName;
   private final List<List<ExternalTsFileDeviceOffset>> deviceOffsets;
 
-  public ExternalTsFileTableScanOperator(
-      AbstractTableScanOperatorParameter parameter,
+  public ExternalTsFileAggTableScanOperator(
+      AbstractAggTableScanOperatorParameter parameter,
       String tableName,
       List<List<ExternalTsFileDeviceOffset>> deviceOffsets) {
     super(parameter);
@@ -68,15 +70,10 @@ public class ExternalTsFileTableScanOperator extends 
TableScanOperator {
 
   @Override
   protected void constructAlignedSeriesScanUtil() {
-    if (!hasCurrentDeviceEntry()) {
-      return;
-    }
-
-    DeviceEntry deviceEntry = getCurrentDeviceEntry();
-    if (deviceEntry == null) {
-      throw new IllegalStateException("Current device entry in 
TableScanOperator is empty");
-    }
-
+    DeviceEntry deviceEntry =
+        deviceEntries.isEmpty() || deviceEntries.get(currentDeviceIndex) == 
null
+            ? new AlignedDeviceEntry(SeriesScanUtil.EMPTY_DEVICE_ID, new 
Binary[0])
+            : deviceEntries.get(currentDeviceIndex);
     this.seriesScanUtil =
         new ExternalTsFileSeriesScanUtil(
             constructAlignedPath(
@@ -91,43 +88,24 @@ public class ExternalTsFileTableScanOperator extends 
TableScanOperator {
 
   private AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata(
       TsFileResource resource, AlignedFullPath alignedPath) throws IOException 
{
-    List<ExternalTsFileDeviceOffset> currentDeviceOffsets = 
deviceOffsets.get(currentDeviceIndex);
-    if (currentDeviceOffsets == null
-        || 
!getCurrentDeviceEntry().getDeviceID().equals(alignedPath.getDeviceId())) {
+    if (deviceEntries.isEmpty() || currentDeviceIndex >= deviceEntries.size()) 
{
       return null;
     }
-
-    long[] deviceMeasurementNodeOffset =
-        getDeviceMeasurementNodeOffset(currentDeviceOffsets, 
resource.getTsFilePath());
-    if (deviceMeasurementNodeOffset == null) {
-      return null;
-    }
-    // TODO: Use deviceMeasurementNodeOffset after FileLoaderUtils supports 
offset-based metadata
-    // loading in this branch.
-    return FileLoaderUtils.loadAlignedTimeSeriesMetadata(
+    List<ExternalTsFileDeviceOffset> currentDeviceOffsets = 
deviceOffsets.get(currentDeviceIndex);
+    return ExternalTsFileSeriesScanUtil.loadTimeSeriesMetadata(
         resource,
         alignedPath,
+        deviceEntries.get(currentDeviceIndex).getDeviceID(),
+        currentDeviceOffsets,
         ((OperatorContext) operatorContext).getInstanceContext(),
-        seriesScanOptions.getGlobalTimeFilter(),
-        resource.isSeq(),
-        ((OperatorContext) 
operatorContext).getInstanceContext().isIgnoreAllNullRows());
-  }
-
-  private long[] getDeviceMeasurementNodeOffset(
-      List<ExternalTsFileDeviceOffset> currentDeviceOffsets, String 
tsFilePath) {
-    for (ExternalTsFileDeviceOffset offset : currentDeviceOffsets) {
-      if (tsFilePath.equals(offset.getTsFilePath())) {
-        return offset.getDeviceMeasurementNodeOffset();
-      }
-    }
-    return null;
+        seriesScanOptions.getGlobalTimeFilter());
   }
 
   @Override
   public long ramBytesUsed() {
     return super.ramBytesUsed()
         + INSTANCE_SIZE
-        - ABSTRACT_DEVICE_TABLE_SCAN_OPERATOR_INSTANCE_SIZE
+        - AbstractDefaultAggTableScanOperator.INSTANCE_SIZE
         + RamUsageEstimator.sizeOfCollection(deviceOffsets);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
index f8e7766ffbe..f46e3b04346 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
@@ -20,8 +20,10 @@
 package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
 
 import org.apache.iotdb.commons.path.AlignedFullPath;
+import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.FileLoaderUtils;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
@@ -29,6 +31,8 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.filter.basic.Filter;
 
 import java.io.IOException;
 import java.util.List;
@@ -78,6 +82,44 @@ public class ExternalTsFileSeriesScanUtil extends 
AlignedSeriesScanUtil {
     // External TsFiles are not managed by IoTDB metadata, so no table/tree 
TTL applies here.
   }
 
+  static AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata(
+      TsFileResource resource,
+      AlignedFullPath alignedPath,
+      IDeviceID currentDeviceID,
+      List<ExternalTsFileDeviceOffset> currentDeviceOffsets,
+      FragmentInstanceContext context,
+      Filter globalTimeFilter)
+      throws IOException {
+    if (currentDeviceOffsets == null || 
!currentDeviceID.equals(alignedPath.getDeviceId())) {
+      return null;
+    }
+
+    long[] deviceMeasurementNodeOffset =
+        getDeviceMeasurementNodeOffset(currentDeviceOffsets, 
resource.getTsFilePath());
+    if (deviceMeasurementNodeOffset == null) {
+      return null;
+    }
+    // TODO: Use deviceMeasurementNodeOffset after FileLoaderUtils supports 
offset-based metadata
+    // loading in this branch.
+    return FileLoaderUtils.loadAlignedTimeSeriesMetadata(
+        resource,
+        alignedPath,
+        context,
+        globalTimeFilter,
+        resource.isSeq(),
+        context.isIgnoreAllNullRows());
+  }
+
+  private static long[] getDeviceMeasurementNodeOffset(
+      List<ExternalTsFileDeviceOffset> currentDeviceOffsets, String 
tsFilePath) {
+    for (ExternalTsFileDeviceOffset offset : currentDeviceOffsets) {
+      if (tsFilePath.equals(offset.getTsFilePath())) {
+        return offset.getDeviceMeasurementNodeOffset();
+      }
+    }
+    return null;
+  }
+
   @FunctionalInterface
   public interface ExternalTsFileMetadataLoader {
     AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java
index daccb199d28..2ad90b1d460 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java
@@ -22,7 +22,6 @@ package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational;
 import org.apache.iotdb.commons.path.AlignedFullPath;
 import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import 
org.apache.iotdb.db.queryengine.execution.operator.source.FileLoaderUtils;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
@@ -92,35 +91,13 @@ public class ExternalTsFileTableScanOperator extends 
TableScanOperator {
   private AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata(
       TsFileResource resource, AlignedFullPath alignedPath) throws IOException 
{
     List<ExternalTsFileDeviceOffset> currentDeviceOffsets = 
deviceOffsets.get(currentDeviceIndex);
-    if (currentDeviceOffsets == null
-        || 
!getCurrentDeviceEntry().getDeviceID().equals(alignedPath.getDeviceId())) {
-      return null;
-    }
-
-    long[] deviceMeasurementNodeOffset =
-        getDeviceMeasurementNodeOffset(currentDeviceOffsets, 
resource.getTsFilePath());
-    if (deviceMeasurementNodeOffset == null) {
-      return null;
-    }
-    // TODO: Use deviceMeasurementNodeOffset after FileLoaderUtils supports 
offset-based metadata
-    // loading in this branch.
-    return FileLoaderUtils.loadAlignedTimeSeriesMetadata(
+    return ExternalTsFileSeriesScanUtil.loadTimeSeriesMetadata(
         resource,
         alignedPath,
+        getCurrentDeviceEntry().getDeviceID(),
+        currentDeviceOffsets,
         ((OperatorContext) operatorContext).getInstanceContext(),
-        seriesScanOptions.getGlobalTimeFilter(),
-        resource.isSeq(),
-        ((OperatorContext) 
operatorContext).getInstanceContext().isIgnoreAllNullRows());
-  }
-
-  private long[] getDeviceMeasurementNodeOffset(
-      List<ExternalTsFileDeviceOffset> currentDeviceOffsets, String 
tsFilePath) {
-    for (ExternalTsFileDeviceOffset offset : currentDeviceOffsets) {
-      if (tsFilePath.equals(offset.getTsFilePath())) {
-        return offset.getDeviceMeasurementNodeOffset();
-      }
-    }
-    return null;
+        seriesScanOptions.getGlobalTimeFilter());
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
index 5ceeef9f86b..85835931f24 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
@@ -96,6 +96,7 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.Abst
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.CteScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.DefaultAggTableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.DeviceIteratorScanOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.ExternalTsFileAggTableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.ExternalTsFileTableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.InformationSchemaTableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.LastQueryAggTableScanOperator;
@@ -124,6 +125,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileAggregationScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode;
@@ -1628,6 +1630,33 @@ public class DataNodeTableOperatorGenerator
     }
   }
 
+  @Override
+  public Operator visitExternalTsFileAggregationScan(
+      ExternalTsFileAggregationScanNode node, LocalExecutionPlanContext 
context) {
+    AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter 
parameter =
+        constructAbstractAggTableScanOperatorParameter(node, context);
+
+    ExternalTsFileAggTableScanOperator aggTableScanOperator =
+        new ExternalTsFileAggTableScanOperator(
+            parameter, node.getQualifiedObjectName().getObjectName(), 
node.getDeviceOffsets());
+
+    
context.getInstanceContext().collectTable(node.getQualifiedObjectName().getObjectName());
+    addSource(
+        aggTableScanOperator,
+        context,
+        node,
+        parameter.getMeasurementColumnNames(),
+        parameter.getMeasurementSchemas(),
+        parameter.getAllSensors(),
+        ExternalTsFileAggregationScanNode.class.getSimpleName());
+
+    DataDriverContext dataDriverContext = (DataDriverContext) 
context.getDriverContext();
+    
dataDriverContext.setQueryDataSourceType(QueryDataSourceType.EXTERNAL_TSFILE_SCAN);
+    context.getInstanceContext().addExternalTsFilePaths(node.getTsFilePaths());
+
+    return aggTableScanOperator;
+  }
+
   private LastQueryAggTableScanOperator constructLastQueryAggTableScanOperator(
       AggregationTableScanNode node,
       AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter 
parameter,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 73c41ab1079..81596f2d38c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -124,6 +124,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationT
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileAggregationScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode;
@@ -654,6 +655,10 @@ public interface PlanVisitor<R, C> extends 
ICoreQueryPlanVisitor<R, C> {
     return visitTableScan(node, context);
   }
 
+  default R 
visitExternalTsFileAggregationScan(ExternalTsFileAggregationScanNode node, C 
context) {
+    return visitAggregationTableScan(node, context);
+  }
+
   default R visitInformationSchemaTableScan(InformationSchemaTableScanNode 
node, C context) {
     return visitTableScan(node, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 104ed577882..105dc1c1935 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -97,6 +97,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggre
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CopyToNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileAggregationScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode;
@@ -797,6 +798,82 @@ public class TableDistributedPlanGenerator
     return result;
   }
 
+  @Override
+  public List<PlanNode> visitExternalTsFileAggregationScan(
+      ExternalTsFileAggregationScanNode node, PlanContext context) {
+    TRegionReplicaSet localRegionReplicaSet =
+        new TRegionReplicaSet(null, 
ImmutableList.of(DataNodeEndPoints.getLocalDataNodeLocation()));
+    node.setRegionReplicaSet(localRegionReplicaSet);
+    context.mostUsedRegion = node.getRegionReplicaSet();
+    List<PlanNode> resultNodes =
+        splitExternalTsFileAggregationScanByDeviceEntries(node, 
localRegionReplicaSet);
+    if (context.hasSortProperty) {
+      processSortProperty(node, resultNodes, context);
+    }
+    return resultNodes;
+  }
+
+  private List<PlanNode> splitExternalTsFileAggregationScanByDeviceEntries(
+      final ExternalTsFileAggregationScanNode node, final TRegionReplicaSet 
localRegionReplicaSet) {
+    List<DeviceEntry> deviceEntries = node.getDeviceEntries();
+    if (deviceEntries.size() <= 1) {
+      return Collections.singletonList(node);
+    }
+
+    int splitCount =
+        Math.min(
+            deviceEntries.size(),
+            
IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism());
+    if (splitCount <= 1) {
+      return Collections.singletonList(node);
+    }
+
+    List<List<DeviceEntry>> splitDeviceEntries = new ArrayList<>(splitCount);
+    List<List<List<ExternalTsFileDeviceOffset>>> splitDeviceOffsets = new 
ArrayList<>(splitCount);
+    for (int i = 0; i < splitCount; i++) {
+      splitDeviceEntries.add(new ArrayList<>());
+      splitDeviceOffsets.add(new ArrayList<>());
+    }
+    for (int i = 0; i < deviceEntries.size(); i++) {
+      splitDeviceEntries.get(i % splitCount).add(deviceEntries.get(i));
+      splitDeviceOffsets.get(i % 
splitCount).add(node.getDeviceOffsets().get(i));
+    }
+
+    List<PlanNode> result = new ArrayList<>(splitCount);
+    for (int i = 0; i < splitDeviceEntries.size(); i++) {
+      List<DeviceEntry> entries = splitDeviceEntries.get(i);
+      if (entries.isEmpty()) {
+        continue;
+      }
+      ExternalTsFileAggregationScanNode splitNode =
+          new ExternalTsFileAggregationScanNode(
+              queryId.genPlanNodeId(),
+              node.getQualifiedObjectName(),
+              node.getOutputSymbols(),
+              node.getAssignments(),
+              entries,
+              node.getTagAndAttributeIndexMap(),
+              node.getScanOrder(),
+              node.getTimePredicate().orElse(null),
+              node.getPushDownPredicate(),
+              node.getPushDownLimit(),
+              node.getPushDownOffset(),
+              node.isPushLimitToEachDevice(),
+              node.containsNonAlignedDevice(),
+              node.getProjection(),
+              node.getAggregations(),
+              node.getGroupingSets(),
+              node.getPreGroupedSymbols(),
+              node.getStep(),
+              node.getGroupIdSymbol(),
+              node.getTsFilePaths(),
+              splitDeviceOffsets.get(i));
+      splitNode.setRegionReplicaSet(localRegionReplicaSet);
+      result.add(splitNode);
+    }
+    return result;
+  }
+
   private List<PlanNode> constructDeviceTableScanByTags(
       final DeviceTableScanNode node, final PlanContext context) {
     DataPartition dataPartition = analysis.getDataPartitionInfo();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
index e5298d01cd3..19057bc7146 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
@@ -295,6 +295,31 @@ public class AggregationTableScanNode extends 
DeviceTableScanNode {
       AggregationNode aggregationNode,
       ProjectNode projectNode,
       DeviceTableScanNode tableScanNode) {
+    if (tableScanNode instanceof ExternalTsFileScanNode) {
+      ExternalTsFileScanNode externalTsFileScanNode = (ExternalTsFileScanNode) 
tableScanNode;
+      return new ExternalTsFileAggregationScanNode(
+          id,
+          tableScanNode.getQualifiedObjectName(),
+          tableScanNode.getOutputSymbols(),
+          tableScanNode.getAssignments(),
+          tableScanNode.getDeviceEntries(),
+          tableScanNode.getTagAndAttributeIndexMap(),
+          tableScanNode.getScanOrder(),
+          tableScanNode.getTimePredicate().orElse(null),
+          tableScanNode.getPushDownPredicate(),
+          tableScanNode.getPushDownLimit(),
+          tableScanNode.getPushDownOffset(),
+          tableScanNode.isPushLimitToEachDevice(),
+          tableScanNode.containsNonAlignedDevice(),
+          projectNode == null ? null : projectNode.getAssignments(),
+          aggregationNode.getAggregations(),
+          aggregationNode.getGroupingSets(),
+          aggregationNode.getPreGroupedSymbols(),
+          aggregationNode.getStep(),
+          aggregationNode.getGroupIdSymbol(),
+          externalTsFileScanNode.getTsFilePaths(),
+          externalTsFileScanNode.getDeviceOffsets());
+    }
     if (tableScanNode instanceof TreeDeviceViewScanNode) {
       TreeDeviceViewScanNode treeDeviceViewScanNode = (TreeDeviceViewScanNode) 
tableScanNode;
       return new AggregationTreeDeviceViewScanNode(
@@ -349,6 +374,31 @@ public class AggregationTableScanNode extends 
DeviceTableScanNode {
       ProjectNode projectNode,
       DeviceTableScanNode tableScanNode,
       AggregationNode.Step step) {
+    if (tableScanNode instanceof ExternalTsFileScanNode) {
+      ExternalTsFileScanNode externalTsFileScanNode = (ExternalTsFileScanNode) 
tableScanNode;
+      return new ExternalTsFileAggregationScanNode(
+          id,
+          tableScanNode.getQualifiedObjectName(),
+          tableScanNode.getOutputSymbols(),
+          tableScanNode.getAssignments(),
+          tableScanNode.getDeviceEntries(),
+          tableScanNode.getTagAndAttributeIndexMap(),
+          tableScanNode.getScanOrder(),
+          tableScanNode.getTimePredicate().orElse(null),
+          tableScanNode.getPushDownPredicate(),
+          tableScanNode.getPushDownLimit(),
+          tableScanNode.getPushDownOffset(),
+          tableScanNode.isPushLimitToEachDevice(),
+          tableScanNode.containsNonAlignedDevice(),
+          projectNode == null ? null : projectNode.getAssignments(),
+          aggregationNode.getAggregations(),
+          aggregationNode.getGroupingSets(),
+          aggregationNode.getPreGroupedSymbols(),
+          step,
+          aggregationNode.getGroupIdSymbol(),
+          externalTsFileScanNode.getTsFilePaths(),
+          externalTsFileScanNode.getDeviceOffsets());
+    }
     if (tableScanNode instanceof TreeDeviceViewScanNode) {
       TreeDeviceViewScanNode treeDeviceViewScanNode = (TreeDeviceViewScanNode) 
tableScanNode;
       return new AggregationTreeDeviceViewScanNode(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileAggregationScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileAggregationScanNode.java
new file mode 100644
index 00000000000..04434ad498e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileAggregationScanNode.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+
+import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.IPlanVisitor;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema;
+import 
org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName;
+import 
org.apache.iotdb.commons.queryengine.plan.relational.planner.Assignments;
+import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol;
+import 
org.apache.iotdb.commons.queryengine.plan.relational.planner.node.AggregationNode;
+import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression;
+import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+public class ExternalTsFileAggregationScanNode extends 
AggregationTableScanNode {
+  private List<String> tsFilePaths;
+  private List<List<ExternalTsFileDeviceOffset>> deviceOffsets = 
Collections.emptyList();
+
+  public ExternalTsFileAggregationScanNode(
+      PlanNodeId id,
+      QualifiedObjectName qualifiedObjectName,
+      List<Symbol> outputSymbols,
+      Map<Symbol, ColumnSchema> assignments,
+      List<DeviceEntry> deviceEntries,
+      Map<Symbol, Integer> tagAndAttributeIndexMap,
+      Ordering scanOrder,
+      Expression timePredicate,
+      Expression pushDownPredicate,
+      long pushDownLimit,
+      long pushDownOffset,
+      boolean pushLimitToEachDevice,
+      boolean containsNonAlignedDevice,
+      Assignments projection,
+      Map<Symbol, AggregationNode.Aggregation> aggregations,
+      AggregationNode.GroupingSetDescriptor groupingSets,
+      List<Symbol> preGroupedSymbols,
+      AggregationNode.Step step,
+      Optional<Symbol> groupIdSymbol,
+      List<String> tsFilePaths,
+      List<List<ExternalTsFileDeviceOffset>> deviceOffsets) {
+    super(
+        id,
+        qualifiedObjectName,
+        outputSymbols,
+        assignments,
+        deviceEntries,
+        tagAndAttributeIndexMap,
+        scanOrder,
+        timePredicate,
+        pushDownPredicate,
+        pushDownLimit,
+        pushDownOffset,
+        pushLimitToEachDevice,
+        containsNonAlignedDevice,
+        projection,
+        aggregations,
+        groupingSets,
+        preGroupedSymbols,
+        step,
+        groupIdSymbol);
+    this.tsFilePaths = Collections.unmodifiableList(new 
ArrayList<>(tsFilePaths));
+    this.deviceOffsets = copyDeviceOffsets(deviceOffsets);
+  }
+
+  protected ExternalTsFileAggregationScanNode() {}
+
+  @Override
+  public <R, C> R accept(IPlanVisitor<R, C> visitor, C context) {
+    return ((PlanVisitor<R, C>) 
visitor).visitExternalTsFileAggregationScan(this, context);
+  }
+
+  @Override
+  public ExternalTsFileAggregationScanNode clone() {
+    return new ExternalTsFileAggregationScanNode(
+        id,
+        qualifiedObjectName,
+        outputSymbols,
+        assignments,
+        deviceEntries,
+        tagAndAttributeIndexMap,
+        scanOrder,
+        timePredicate,
+        pushDownPredicate,
+        pushDownLimit,
+        pushDownOffset,
+        pushLimitToEachDevice,
+        containsNonAlignedDevice,
+        projection,
+        aggregations,
+        groupingSets,
+        preGroupedSymbols,
+        step,
+        groupIdSymbol,
+        tsFilePaths,
+        deviceOffsets);
+  }
+
+  public List<String> getTsFilePaths() {
+    return tsFilePaths;
+  }
+
+  public List<List<ExternalTsFileDeviceOffset>> getDeviceOffsets() {
+    return deviceOffsets;
+  }
+
+  @Override
+  public void sortDeviceEntries(Comparator<DeviceEntry> comparator) {
+    int[] indexes =
+        IntStream.range(0, deviceEntries.size())
+            .boxed()
+            .sorted(
+                (left, right) ->
+                    comparator.compare(deviceEntries.get(left), 
deviceEntries.get(right)))
+            .mapToInt(Integer::intValue)
+            .toArray();
+    List<DeviceEntry> sortedDeviceEntries = new 
ArrayList<>(deviceEntries.size());
+    List<List<ExternalTsFileDeviceOffset>> sortedDeviceOffsets =
+        new ArrayList<>(deviceOffsets.size());
+    for (int index : indexes) {
+      sortedDeviceEntries.add(deviceEntries.get(index));
+      sortedDeviceOffsets.add(deviceOffsets.get(index));
+    }
+    this.deviceEntries = sortedDeviceEntries;
+    this.deviceOffsets = sortedDeviceOffsets;
+  }
+
+  private static List<List<ExternalTsFileDeviceOffset>> copyDeviceOffsets(
+      List<List<ExternalTsFileDeviceOffset>> deviceOffsets) {
+    List<List<ExternalTsFileDeviceOffset>> copiedDeviceOffsets =
+        new ArrayList<>(deviceOffsets.size());
+    for (List<ExternalTsFileDeviceOffset> offsets : deviceOffsets) {
+      copiedDeviceOffsets.add(new ArrayList<>(offsets));
+    }
+    return copiedDeviceOffsets;
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    throw new UnsupportedOperationException(
+        "ExternalTsFileAggregationScanNode cannot be serialized because it 
reads local external TsFiles");
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    throw new UnsupportedOperationException(
+        "ExternalTsFileAggregationScanNode cannot be serialized because it 
reads local external TsFiles");
+  }
+
+  public static ExternalTsFileAggregationScanNode deserialize(ByteBuffer 
byteBuffer) {
+    throw new UnsupportedOperationException(
+        "ExternalTsFileAggregationScanNode cannot be deserialized because it 
reads local external TsFiles");
+  }
+
+  @Override
+  public String toString() {
+    return "ExternalTsFileAggregationScanNode-" + this.getPlanNodeId();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
index b5072bfd5c6..c650a46119c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.SymbolRefere
 import 
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
@@ -70,6 +71,7 @@ public class PushAggregationIntoTableScan implements 
PlanOptimizer {
         new Rewriter(),
         new Context(
             context.getQueryContext().getQueryId(),
+            context.getAnalysis(),
             context.getMetadata(),
             context.sessionInfo(),
             context.getSymbolAllocator()));
@@ -105,7 +107,6 @@ public class PushAggregationIntoTableScan implements 
PlanOptimizer {
 
       // only optimize AggregationNode with raw DeviceTableScanNode
       if (tableScanNode == null
-          || tableScanNode instanceof ExternalTsFileScanNode
           || tableScanNode instanceof AggregationTableScanNode) { // no need 
to optimize
         return node;
       }
@@ -116,6 +117,7 @@ public class PushAggregationIntoTableScan implements 
PlanOptimizer {
               node.getGroupingKeys(),
               projectNode,
               tableScanNode,
+              context.analysis,
               context.session,
               context.metadata);
       if (pushDownLevel == PushDownLevel.NOOP) { // no push-down
@@ -140,6 +142,7 @@ public class PushAggregationIntoTableScan implements 
PlanOptimizer {
         List<Symbol> groupingKeys,
         ProjectNode projectNode,
         DeviceTableScanNode tableScanNode,
+        Analysis analysis,
         SessionInfo session,
         Metadata metadata) {
       boolean hasProject = projectNode != null;
@@ -195,7 +198,7 @@ public class PushAggregationIntoTableScan implements 
PlanOptimizer {
         return PushDownLevel.NOOP;
       } else if (singleDeviceEntry
           || ImmutableSet.copyOf(groupingKeys)
-              .containsAll(getTagColumnsInTableStore(tableScanNode, metadata, 
session))) {
+              .containsAll(getTagColumnsInTableStore(tableScanNode, analysis, 
metadata, session))) {
         // If all tag columns appear in groupingKeys and no Measurement column 
appears, we can push
         // down completely.
         return PushDownLevel.COMPLETE;
@@ -205,7 +208,19 @@ public class PushAggregationIntoTableScan implements 
PlanOptimizer {
     }
 
     private List<Symbol> getTagColumnsInTableStore(
-        DeviceTableScanNode tableScanNode, Metadata metadata, SessionInfo 
session) {
+        DeviceTableScanNode tableScanNode,
+        Analysis analysis,
+        Metadata metadata,
+        SessionInfo session) {
+      if (tableScanNode instanceof ExternalTsFileScanNode) {
+        return analysis
+            .getTableColumnSchema(tableScanNode.getQualifiedObjectName())
+            .entrySet()
+            .stream()
+            .filter(entry -> entry.getValue().getColumnCategory() == TAG)
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toList());
+      }
       return Objects.requireNonNull(
               metadata.getTableSchema(session, 
tableScanNode.getQualifiedObjectName()).orElse(null))
           .getColumns()
@@ -242,13 +257,19 @@ public class PushAggregationIntoTableScan implements 
PlanOptimizer {
 
   private static class Context {
     private final QueryId queryId;
+    private final Analysis analysis;
     private final Metadata metadata;
     private final SessionInfo session;
     private final SymbolAllocator symbolAllocator;
 
     public Context(
-        QueryId queryId, Metadata metadata, SessionInfo session, 
SymbolAllocator symbolAllocator) {
+        QueryId queryId,
+        Analysis analysis,
+        Metadata metadata,
+        SessionInfo session,
+        SymbolAllocator symbolAllocator) {
       this.queryId = queryId;
+      this.analysis = analysis;
       this.metadata = metadata;
       this.session = session;
       this.symbolAllocator = symbolAllocator;


Reply via email to