This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch read_tsfile_table_function in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4be16d3042a8f65a267a8f26ead6ffeb59989c72 Author: shuwenwei <[email protected]> AuthorDate: Fri Jun 5 17:16:47 2026 +0800 agg scan --- .../relational/AbstractAggTableScanOperator.java | 2 +- .../AbstractDefaultAggTableScanOperator.java | 2 +- ...ava => ExternalTsFileAggTableScanOperator.java} | 62 +++---- .../relational/ExternalTsFileSeriesScanUtil.java | 42 +++++ .../ExternalTsFileTableScanOperator.java | 31 +--- .../planner/DataNodeTableOperatorGenerator.java | 29 ++++ .../plan/planner/plan/node/PlanVisitor.java | 5 + .../distribute/TableDistributedPlanGenerator.java | 77 +++++++++ .../planner/node/AggregationTableScanNode.java | 50 ++++++ .../node/ExternalTsFileAggregationScanNode.java | 189 +++++++++++++++++++++ .../PushAggregationIntoTableScan.java | 29 +++- 11 files changed, 443 insertions(+), 75 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java index 1f37ce9f104..e3030c0752c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java @@ -96,7 +96,7 @@ public abstract class AbstractAggTableScanOperator extends AbstractDataSourceOpe protected SeriesScanOptions seriesScanOptions; private final boolean ascending; - private final Ordering scanOrder; + protected final Ordering scanOrder; // Some special data types(like BLOB) cannot use statistics protected final boolean canUseStatistics; private final long cachedRawDataSize; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDefaultAggTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDefaultAggTableScanOperator.java index f00b0049ef6..797d4491e41 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDefaultAggTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDefaultAggTableScanOperator.java @@ -34,7 +34,7 @@ import static org.apache.iotdb.calc.plan.planner.CommonOperatorUtils.TIME_COLUMN public abstract class AbstractDefaultAggTableScanOperator extends AbstractAggTableScanOperator { - private static final long INSTANCE_SIZE = + protected static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(AbstractDefaultAggTableScanOperator.class); protected AbstractDefaultAggTableScanOperator(AbstractAggTableScanOperatorParameter parameter) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java similarity index 66% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java index daccb199d28..79a101a6a79 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java @@ -22,28 +22,30 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; -import org.apache.iotdb.db.queryengine.execution.operator.source.FileLoaderUtils; +import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanUtil; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata; +import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.RamUsageEstimator; import java.io.IOException; import java.util.ArrayList; import java.util.List; -public class ExternalTsFileTableScanOperator extends TableScanOperator { +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath; + +public class ExternalTsFileAggTableScanOperator extends DefaultAggTableScanOperator { private static final long INSTANCE_SIZE = - RamUsageEstimator.shallowSizeOfInstance(ExternalTsFileTableScanOperator.class); - private static final long ABSTRACT_DEVICE_TABLE_SCAN_OPERATOR_INSTANCE_SIZE = - RamUsageEstimator.shallowSizeOfInstance(AbstractDeviceTableScanOperator.class); + RamUsageEstimator.shallowSizeOfInstance(ExternalTsFileAggTableScanOperator.class); private final String tableName; private final List<List<ExternalTsFileDeviceOffset>> deviceOffsets; - public ExternalTsFileTableScanOperator( - AbstractTableScanOperatorParameter parameter, + public ExternalTsFileAggTableScanOperator( + AbstractAggTableScanOperatorParameter parameter, String tableName, List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { super(parameter); @@ -68,15 +70,10 @@ public class ExternalTsFileTableScanOperator extends TableScanOperator { @Override protected void constructAlignedSeriesScanUtil() { - if (!hasCurrentDeviceEntry()) { - return; - } - - DeviceEntry deviceEntry = getCurrentDeviceEntry(); - if (deviceEntry == null) { - throw new IllegalStateException("Current device entry in TableScanOperator is empty"); - } - + DeviceEntry deviceEntry = + deviceEntries.isEmpty() || deviceEntries.get(currentDeviceIndex) == null + ? new AlignedDeviceEntry(SeriesScanUtil.EMPTY_DEVICE_ID, new Binary[0]) + : deviceEntries.get(currentDeviceIndex); this.seriesScanUtil = new ExternalTsFileSeriesScanUtil( constructAlignedPath( @@ -91,43 +88,24 @@ public class ExternalTsFileTableScanOperator extends TableScanOperator { private AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata( TsFileResource resource, AlignedFullPath alignedPath) throws IOException { - List<ExternalTsFileDeviceOffset> currentDeviceOffsets = deviceOffsets.get(currentDeviceIndex); - if (currentDeviceOffsets == null - || !getCurrentDeviceEntry().getDeviceID().equals(alignedPath.getDeviceId())) { + if (deviceEntries.isEmpty() || currentDeviceIndex >= deviceEntries.size()) { return null; } - - long[] deviceMeasurementNodeOffset = - getDeviceMeasurementNodeOffset(currentDeviceOffsets, resource.getTsFilePath()); - if (deviceMeasurementNodeOffset == null) { - return null; - } - // TODO: Use deviceMeasurementNodeOffset after FileLoaderUtils supports offset-based metadata - // loading in this branch. - return FileLoaderUtils.loadAlignedTimeSeriesMetadata( + List<ExternalTsFileDeviceOffset> currentDeviceOffsets = deviceOffsets.get(currentDeviceIndex); + return ExternalTsFileSeriesScanUtil.loadTimeSeriesMetadata( resource, alignedPath, + deviceEntries.get(currentDeviceIndex).getDeviceID(), + currentDeviceOffsets, ((OperatorContext) operatorContext).getInstanceContext(), - seriesScanOptions.getGlobalTimeFilter(), - resource.isSeq(), - ((OperatorContext) operatorContext).getInstanceContext().isIgnoreAllNullRows()); - } - - private long[] getDeviceMeasurementNodeOffset( - List<ExternalTsFileDeviceOffset> currentDeviceOffsets, String tsFilePath) { - for (ExternalTsFileDeviceOffset offset : currentDeviceOffsets) { - if (tsFilePath.equals(offset.getTsFilePath())) { - return offset.getDeviceMeasurementNodeOffset(); - } - } - return null; + seriesScanOptions.getGlobalTimeFilter()); } @Override public long ramBytesUsed() { return super.ramBytesUsed() + INSTANCE_SIZE - - ABSTRACT_DEVICE_TABLE_SCAN_OPERATOR_INSTANCE_SIZE + - AbstractDefaultAggTableScanOperator.INSTANCE_SIZE + RamUsageEstimator.sizeOfCollection(deviceOffsets); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java index f8e7766ffbe..f46e3b04346 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java @@ -20,8 +20,10 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; import org.apache.iotdb.commons.path.AlignedFullPath; +import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil; +import org.apache.iotdb.db.queryengine.execution.operator.source.FileLoaderUtils; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; @@ -29,6 +31,8 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.filter.basic.Filter; import java.io.IOException; import java.util.List; @@ -78,6 +82,44 @@ public class ExternalTsFileSeriesScanUtil extends AlignedSeriesScanUtil { // External TsFiles are not managed by IoTDB metadata, so no table/tree TTL applies here. } + static AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata( + TsFileResource resource, + AlignedFullPath alignedPath, + IDeviceID currentDeviceID, + List<ExternalTsFileDeviceOffset> currentDeviceOffsets, + FragmentInstanceContext context, + Filter globalTimeFilter) + throws IOException { + if (currentDeviceOffsets == null || !currentDeviceID.equals(alignedPath.getDeviceId())) { + return null; + } + + long[] deviceMeasurementNodeOffset = + getDeviceMeasurementNodeOffset(currentDeviceOffsets, resource.getTsFilePath()); + if (deviceMeasurementNodeOffset == null) { + return null; + } + // TODO: Use deviceMeasurementNodeOffset after FileLoaderUtils supports offset-based metadata + // loading in this branch. + return FileLoaderUtils.loadAlignedTimeSeriesMetadata( + resource, + alignedPath, + context, + globalTimeFilter, + resource.isSeq(), + context.isIgnoreAllNullRows()); + } + + private static long[] getDeviceMeasurementNodeOffset( + List<ExternalTsFileDeviceOffset> currentDeviceOffsets, String tsFilePath) { + for (ExternalTsFileDeviceOffset offset : currentDeviceOffsets) { + if (tsFilePath.equals(offset.getTsFilePath())) { + return offset.getDeviceMeasurementNodeOffset(); + } + } + return null; + } + @FunctionalInterface public interface ExternalTsFileMetadataLoader { AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java index daccb199d28..2ad90b1d460 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java @@ -22,7 +22,6 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; -import org.apache.iotdb.db.queryengine.execution.operator.source.FileLoaderUtils; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -92,35 +91,13 @@ public class ExternalTsFileTableScanOperator extends TableScanOperator { private AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata( TsFileResource resource, AlignedFullPath alignedPath) throws IOException { List<ExternalTsFileDeviceOffset> currentDeviceOffsets = deviceOffsets.get(currentDeviceIndex); - if (currentDeviceOffsets == null - || !getCurrentDeviceEntry().getDeviceID().equals(alignedPath.getDeviceId())) { - return null; - } - - long[] deviceMeasurementNodeOffset = - getDeviceMeasurementNodeOffset(currentDeviceOffsets, resource.getTsFilePath()); - if (deviceMeasurementNodeOffset == null) { - return null; - } - // TODO: Use deviceMeasurementNodeOffset after FileLoaderUtils supports offset-based metadata - // loading in this branch. - return FileLoaderUtils.loadAlignedTimeSeriesMetadata( + return ExternalTsFileSeriesScanUtil.loadTimeSeriesMetadata( resource, alignedPath, + getCurrentDeviceEntry().getDeviceID(), + currentDeviceOffsets, ((OperatorContext) operatorContext).getInstanceContext(), - seriesScanOptions.getGlobalTimeFilter(), - resource.isSeq(), - ((OperatorContext) operatorContext).getInstanceContext().isIgnoreAllNullRows()); - } - - private long[] getDeviceMeasurementNodeOffset( - List<ExternalTsFileDeviceOffset> currentDeviceOffsets, String tsFilePath) { - for (ExternalTsFileDeviceOffset offset : currentDeviceOffsets) { - if (tsFilePath.equals(offset.getTsFilePath())) { - return offset.getDeviceMeasurementNodeOffset(); - } - } - return null; + seriesScanOptions.getGlobalTimeFilter()); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java index 5ceeef9f86b..85835931f24 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java @@ -96,6 +96,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.source.relational.Abst import org.apache.iotdb.db.queryengine.execution.operator.source.relational.CteScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.DefaultAggTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.DeviceIteratorScanOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.ExternalTsFileAggTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.ExternalTsFileTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.InformationSchemaTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.LastQueryAggTableScanOperator; @@ -124,6 +125,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode; @@ -1628,6 +1630,33 @@ public class DataNodeTableOperatorGenerator } } + @Override + public Operator visitExternalTsFileAggregationScan( + ExternalTsFileAggregationScanNode node, LocalExecutionPlanContext context) { + AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter parameter = + constructAbstractAggTableScanOperatorParameter(node, context); + + ExternalTsFileAggTableScanOperator aggTableScanOperator = + new ExternalTsFileAggTableScanOperator( + parameter, node.getQualifiedObjectName().getObjectName(), node.getDeviceOffsets()); + + context.getInstanceContext().collectTable(node.getQualifiedObjectName().getObjectName()); + addSource( + aggTableScanOperator, + context, + node, + parameter.getMeasurementColumnNames(), + parameter.getMeasurementSchemas(), + parameter.getAllSensors(), + ExternalTsFileAggregationScanNode.class.getSimpleName()); + + DataDriverContext dataDriverContext = (DataDriverContext) context.getDriverContext(); + dataDriverContext.setQueryDataSourceType(QueryDataSourceType.EXTERNAL_TSFILE_SCAN); + context.getInstanceContext().addExternalTsFilePaths(node.getTsFilePaths()); + + return aggTableScanOperator; + } + private LastQueryAggTableScanOperator constructLastQueryAggTableScanOperator( AggregationTableScanNode node, AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter parameter, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 73c41ab1079..81596f2d38c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -124,6 +124,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationT import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode; @@ -654,6 +655,10 @@ public interface PlanVisitor<R, C> extends ICoreQueryPlanVisitor<R, C> { return visitTableScan(node, context); } + default R visitExternalTsFileAggregationScan(ExternalTsFileAggregationScanNode node, C context) { + return visitAggregationTableScan(node, context); + } + default R visitInformationSchemaTableScan(InformationSchemaTableScanNode node, C context) { return visitTableScan(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 104ed577882..105dc1c1935 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -97,6 +97,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggre import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CopyToNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode; @@ -797,6 +798,82 @@ public class TableDistributedPlanGenerator return result; } + @Override + public List<PlanNode> visitExternalTsFileAggregationScan( + ExternalTsFileAggregationScanNode node, PlanContext context) { + TRegionReplicaSet localRegionReplicaSet = + new TRegionReplicaSet(null, ImmutableList.of(DataNodeEndPoints.getLocalDataNodeLocation())); + node.setRegionReplicaSet(localRegionReplicaSet); + context.mostUsedRegion = node.getRegionReplicaSet(); + List<PlanNode> resultNodes = + splitExternalTsFileAggregationScanByDeviceEntries(node, localRegionReplicaSet); + if (context.hasSortProperty) { + processSortProperty(node, resultNodes, context); + } + return resultNodes; + } + + private List<PlanNode> splitExternalTsFileAggregationScanByDeviceEntries( + final ExternalTsFileAggregationScanNode node, final TRegionReplicaSet localRegionReplicaSet) { + List<DeviceEntry> deviceEntries = node.getDeviceEntries(); + if (deviceEntries.size() <= 1) { + return Collections.singletonList(node); + } + + int splitCount = + Math.min( + deviceEntries.size(), + IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism()); + if (splitCount <= 1) { + return Collections.singletonList(node); + } + + List<List<DeviceEntry>> splitDeviceEntries = new ArrayList<>(splitCount); + List<List<List<ExternalTsFileDeviceOffset>>> splitDeviceOffsets = new ArrayList<>(splitCount); + for (int i = 0; i < splitCount; i++) { + splitDeviceEntries.add(new ArrayList<>()); + splitDeviceOffsets.add(new ArrayList<>()); + } + for (int i = 0; i < deviceEntries.size(); i++) { + splitDeviceEntries.get(i % splitCount).add(deviceEntries.get(i)); + splitDeviceOffsets.get(i % splitCount).add(node.getDeviceOffsets().get(i)); + } + + List<PlanNode> result = new ArrayList<>(splitCount); + for (int i = 0; i < splitDeviceEntries.size(); i++) { + List<DeviceEntry> entries = splitDeviceEntries.get(i); + if (entries.isEmpty()) { + continue; + } + ExternalTsFileAggregationScanNode splitNode = + new ExternalTsFileAggregationScanNode( + queryId.genPlanNodeId(), + node.getQualifiedObjectName(), + node.getOutputSymbols(), + node.getAssignments(), + entries, + node.getTagAndAttributeIndexMap(), + node.getScanOrder(), + node.getTimePredicate().orElse(null), + node.getPushDownPredicate(), + node.getPushDownLimit(), + node.getPushDownOffset(), + node.isPushLimitToEachDevice(), + node.containsNonAlignedDevice(), + node.getProjection(), + node.getAggregations(), + node.getGroupingSets(), + node.getPreGroupedSymbols(), + node.getStep(), + node.getGroupIdSymbol(), + node.getTsFilePaths(), + splitDeviceOffsets.get(i)); + splitNode.setRegionReplicaSet(localRegionReplicaSet); + result.add(splitNode); + } + return result; + } + private List<PlanNode> constructDeviceTableScanByTags( final DeviceTableScanNode node, final PlanContext context) { DataPartition dataPartition = analysis.getDataPartitionInfo(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java index e5298d01cd3..19057bc7146 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java @@ -295,6 +295,31 @@ public class AggregationTableScanNode extends DeviceTableScanNode { AggregationNode aggregationNode, ProjectNode projectNode, DeviceTableScanNode tableScanNode) { + if (tableScanNode instanceof ExternalTsFileScanNode) { + ExternalTsFileScanNode externalTsFileScanNode = (ExternalTsFileScanNode) tableScanNode; + return new ExternalTsFileAggregationScanNode( + id, + tableScanNode.getQualifiedObjectName(), + tableScanNode.getOutputSymbols(), + tableScanNode.getAssignments(), + tableScanNode.getDeviceEntries(), + tableScanNode.getTagAndAttributeIndexMap(), + tableScanNode.getScanOrder(), + tableScanNode.getTimePredicate().orElse(null), + tableScanNode.getPushDownPredicate(), + tableScanNode.getPushDownLimit(), + tableScanNode.getPushDownOffset(), + tableScanNode.isPushLimitToEachDevice(), + tableScanNode.containsNonAlignedDevice(), + projectNode == null ? null : projectNode.getAssignments(), + aggregationNode.getAggregations(), + aggregationNode.getGroupingSets(), + aggregationNode.getPreGroupedSymbols(), + aggregationNode.getStep(), + aggregationNode.getGroupIdSymbol(), + externalTsFileScanNode.getTsFilePaths(), + externalTsFileScanNode.getDeviceOffsets()); + } if (tableScanNode instanceof TreeDeviceViewScanNode) { TreeDeviceViewScanNode treeDeviceViewScanNode = (TreeDeviceViewScanNode) tableScanNode; return new AggregationTreeDeviceViewScanNode( @@ -349,6 +374,31 @@ public class AggregationTableScanNode extends DeviceTableScanNode { ProjectNode projectNode, DeviceTableScanNode tableScanNode, AggregationNode.Step step) { + if (tableScanNode instanceof ExternalTsFileScanNode) { + ExternalTsFileScanNode externalTsFileScanNode = (ExternalTsFileScanNode) tableScanNode; + return new ExternalTsFileAggregationScanNode( + id, + tableScanNode.getQualifiedObjectName(), + tableScanNode.getOutputSymbols(), + tableScanNode.getAssignments(), + tableScanNode.getDeviceEntries(), + tableScanNode.getTagAndAttributeIndexMap(), + tableScanNode.getScanOrder(), + tableScanNode.getTimePredicate().orElse(null), + tableScanNode.getPushDownPredicate(), + tableScanNode.getPushDownLimit(), + tableScanNode.getPushDownOffset(), + tableScanNode.isPushLimitToEachDevice(), + tableScanNode.containsNonAlignedDevice(), + projectNode == null ? null : projectNode.getAssignments(), + aggregationNode.getAggregations(), + aggregationNode.getGroupingSets(), + aggregationNode.getPreGroupedSymbols(), + step, + aggregationNode.getGroupIdSymbol(), + externalTsFileScanNode.getTsFilePaths(), + externalTsFileScanNode.getDeviceOffsets()); + } if (tableScanNode instanceof TreeDeviceViewScanNode) { TreeDeviceViewScanNode treeDeviceViewScanNode = (TreeDeviceViewScanNode) tableScanNode; return new AggregationTreeDeviceViewScanNode( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileAggregationScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileAggregationScanNode.java new file mode 100644 index 00000000000..04434ad498e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileAggregationScanNode.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.IPlanVisitor; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.Assignments; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.AggregationNode; +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.IntStream; + +public class ExternalTsFileAggregationScanNode extends AggregationTableScanNode { + private List<String> tsFilePaths; + private List<List<ExternalTsFileDeviceOffset>> deviceOffsets = Collections.emptyList(); + + public ExternalTsFileAggregationScanNode( + PlanNodeId id, + QualifiedObjectName qualifiedObjectName, + List<Symbol> outputSymbols, + Map<Symbol, ColumnSchema> assignments, + List<DeviceEntry> deviceEntries, + Map<Symbol, Integer> tagAndAttributeIndexMap, + Ordering scanOrder, + Expression timePredicate, + Expression pushDownPredicate, + long pushDownLimit, + long pushDownOffset, + boolean pushLimitToEachDevice, + boolean containsNonAlignedDevice, + Assignments projection, + Map<Symbol, AggregationNode.Aggregation> aggregations, + AggregationNode.GroupingSetDescriptor groupingSets, + List<Symbol> preGroupedSymbols, + AggregationNode.Step step, + Optional<Symbol> groupIdSymbol, + List<String> tsFilePaths, + List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { + super( + id, + qualifiedObjectName, + outputSymbols, + assignments, + deviceEntries, + tagAndAttributeIndexMap, + scanOrder, + timePredicate, + pushDownPredicate, + pushDownLimit, + pushDownOffset, + pushLimitToEachDevice, + containsNonAlignedDevice, + projection, + aggregations, + groupingSets, + preGroupedSymbols, + step, + groupIdSymbol); + this.tsFilePaths = Collections.unmodifiableList(new ArrayList<>(tsFilePaths)); + this.deviceOffsets = copyDeviceOffsets(deviceOffsets); + } + + protected ExternalTsFileAggregationScanNode() {} + + @Override + public <R, C> R accept(IPlanVisitor<R, C> visitor, C context) { + return ((PlanVisitor<R, C>) visitor).visitExternalTsFileAggregationScan(this, context); + } + + @Override + public ExternalTsFileAggregationScanNode clone() { + return new ExternalTsFileAggregationScanNode( + id, + qualifiedObjectName, + outputSymbols, + assignments, + deviceEntries, + tagAndAttributeIndexMap, + scanOrder, + timePredicate, + pushDownPredicate, + pushDownLimit, + pushDownOffset, + pushLimitToEachDevice, + containsNonAlignedDevice, + projection, + aggregations, + groupingSets, + preGroupedSymbols, + step, + groupIdSymbol, + tsFilePaths, + deviceOffsets); + } + + public List<String> getTsFilePaths() { + return tsFilePaths; + } + + public List<List<ExternalTsFileDeviceOffset>> getDeviceOffsets() { + return deviceOffsets; + } + + @Override + public void sortDeviceEntries(Comparator<DeviceEntry> comparator) { + int[] indexes = + IntStream.range(0, deviceEntries.size()) + .boxed() + .sorted( + (left, right) -> + comparator.compare(deviceEntries.get(left), deviceEntries.get(right))) + .mapToInt(Integer::intValue) + .toArray(); + List<DeviceEntry> sortedDeviceEntries = new ArrayList<>(deviceEntries.size()); + List<List<ExternalTsFileDeviceOffset>> sortedDeviceOffsets = + new ArrayList<>(deviceOffsets.size()); + for (int index : indexes) { + sortedDeviceEntries.add(deviceEntries.get(index)); + sortedDeviceOffsets.add(deviceOffsets.get(index)); + } + this.deviceEntries = sortedDeviceEntries; + this.deviceOffsets = sortedDeviceOffsets; + } + + private static List<List<ExternalTsFileDeviceOffset>> copyDeviceOffsets( + List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { + List<List<ExternalTsFileDeviceOffset>> copiedDeviceOffsets = + new ArrayList<>(deviceOffsets.size()); + for (List<ExternalTsFileDeviceOffset> offsets : deviceOffsets) { + copiedDeviceOffsets.add(new ArrayList<>(offsets)); + } + return copiedDeviceOffsets; + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + throw new UnsupportedOperationException( + "ExternalTsFileAggregationScanNode cannot be serialized because it reads local external TsFiles"); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + throw new UnsupportedOperationException( + "ExternalTsFileAggregationScanNode cannot be serialized because it reads local external TsFiles"); + } + + public static ExternalTsFileAggregationScanNode deserialize(ByteBuffer byteBuffer) { + throw new UnsupportedOperationException( + "ExternalTsFileAggregationScanNode cannot be deserialized because it reads local external TsFiles"); + } + + @Override + public String toString() { + return "ExternalTsFileAggregationScanNode-" + this.getPlanNodeId(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java index b5072bfd5c6..c650a46119c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java @@ -30,6 +30,7 @@ import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.SymbolRefere import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; @@ -70,6 +71,7 @@ public class PushAggregationIntoTableScan implements PlanOptimizer { new Rewriter(), new Context( context.getQueryContext().getQueryId(), + context.getAnalysis(), context.getMetadata(), context.sessionInfo(), context.getSymbolAllocator())); @@ -105,7 +107,6 @@ public class PushAggregationIntoTableScan implements PlanOptimizer { // only optimize AggregationNode with raw DeviceTableScanNode if (tableScanNode == null - || tableScanNode instanceof ExternalTsFileScanNode || tableScanNode instanceof AggregationTableScanNode) { // no need to optimize return node; } @@ -116,6 +117,7 @@ public class PushAggregationIntoTableScan implements PlanOptimizer { node.getGroupingKeys(), projectNode, tableScanNode, + context.analysis, context.session, context.metadata); if (pushDownLevel == PushDownLevel.NOOP) { // no push-down @@ -140,6 +142,7 @@ public class PushAggregationIntoTableScan implements PlanOptimizer { List<Symbol> groupingKeys, ProjectNode projectNode, DeviceTableScanNode tableScanNode, + Analysis analysis, SessionInfo session, Metadata metadata) { boolean hasProject = projectNode != null; @@ -195,7 +198,7 @@ public class PushAggregationIntoTableScan implements PlanOptimizer { return PushDownLevel.NOOP; } else if (singleDeviceEntry || ImmutableSet.copyOf(groupingKeys) - .containsAll(getTagColumnsInTableStore(tableScanNode, metadata, session))) { + .containsAll(getTagColumnsInTableStore(tableScanNode, analysis, metadata, session))) { // If all tag columns appear in groupingKeys and no Measurement column appears, we can push // down completely. return PushDownLevel.COMPLETE; @@ -205,7 +208,19 @@ public class PushAggregationIntoTableScan implements PlanOptimizer { } private List<Symbol> getTagColumnsInTableStore( - DeviceTableScanNode tableScanNode, Metadata metadata, SessionInfo session) { + DeviceTableScanNode tableScanNode, + Analysis analysis, + Metadata metadata, + SessionInfo session) { + if (tableScanNode instanceof ExternalTsFileScanNode) { + return analysis + .getTableColumnSchema(tableScanNode.getQualifiedObjectName()) + .entrySet() + .stream() + .filter(entry -> entry.getValue().getColumnCategory() == TAG) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } return Objects.requireNonNull( metadata.getTableSchema(session, tableScanNode.getQualifiedObjectName()).orElse(null)) .getColumns() @@ -242,13 +257,19 @@ public class PushAggregationIntoTableScan implements PlanOptimizer { private static class Context { private final QueryId queryId; + private final Analysis analysis; private final Metadata metadata; private final SessionInfo session; private final SymbolAllocator symbolAllocator; public Context( - QueryId queryId, Metadata metadata, SessionInfo session, SymbolAllocator symbolAllocator) { + QueryId queryId, + Analysis analysis, + Metadata metadata, + SessionInfo session, + SymbolAllocator symbolAllocator) { this.queryId = queryId; + this.analysis = analysis; this.metadata = metadata; this.session = session; this.symbolAllocator = symbolAllocator;
