This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch read_tsfile_table_function in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 342c4e1e17d9b6c60934186eb3ec6083e8358aad Author: shuwenwei <[email protected]> AuthorDate: Thu Jun 4 18:38:44 2026 +0800 impl 2 --- ...r.java => ExternalTsFileTableScanOperator.java} | 142 +++++-------- .../UnorderedExternalTsFileTableScanOperator.java | 71 ++++++- .../planner/DataNodeTableOperatorGenerator.java | 52 +---- .../plan/planner/plan/node/PlanGraphPrinter.java | 8 - .../distribute/TableDistributedPlanGenerator.java | 144 ++++++++++++- .../iterative/rule/PruneTableScanColumns.java | 6 +- .../planner/node/ExternalTsFileScanNode.java | 222 +++++++++++---------- .../optimizations/PushPredicateIntoTableScan.java | 94 ++++++++- .../optimizations/UnaliasSymbolReferences.java | 6 +- .../relational/tvf/ReadTsFileTableFunction.java | 47 ++--- 10 files changed, 490 insertions(+), 302 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java similarity index 51% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java index b1eb554a3ef..0c68ec96c9b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java @@ -19,46 +19,45 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; -import org.apache.iotdb.commons.schema.filter.SchemaFilter; +import org.apache.iotdb.commons.path.AlignedFullPath; +import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; -import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; +import org.apache.iotdb.db.queryengine.execution.operator.source.FileLoaderUtils; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; -import org.apache.iotdb.db.utils.EncryptDBUtils; -import org.apache.tsfile.file.metadata.IDeviceID; -import org.apache.tsfile.read.TsFileSequenceReader; -import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata; import org.apache.tsfile.utils.RamUsageEstimator; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import static org.apache.iotdb.calc.plan.planner.CommonOperatorUtils.CURRENT_DEVICE_INDEX_STRING; -public class UnorderedExternalTsFileTableScanOperator extends AbstractTableScanOperator { +public class ExternalTsFileTableScanOperator extends AbstractTableScanOperator { private static final long INSTANCE_SIZE = - RamUsageEstimator.shallowSizeOfInstance(UnorderedExternalTsFileTableScanOperator.class); + RamUsageEstimator.shallowSizeOfInstance(ExternalTsFileTableScanOperator.class); private final String tableName; - private final SchemaFilter deviceFilter; + private final List<DeviceEntry> deviceEntries; + private final List<List<ExternalTsFileDeviceOffset>> deviceOffsets; - private MultiTsFileResourceIterator deviceIterator; - private Map<TsFileResource, TsFileSequenceReader> resourceReaderMap = Collections.emptyMap(); private DeviceEntry currentDeviceEntry; + private List<ExternalTsFileDeviceOffset> currentDeviceOffsets; private int currentDeviceIndex; - public UnorderedExternalTsFileTableScanOperator( - AbstractTableScanOperatorParameter parameter, String tableName, SchemaFilter deviceFilter) { + public ExternalTsFileTableScanOperator( + AbstractTableScanOperatorParameter parameter, + String tableName, + List<DeviceEntry> deviceEntries, + List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { super(parameter); this.tableName = tableName; - this.deviceFilter = deviceFilter; + this.deviceEntries = new ArrayList<>(deviceEntries); + this.deviceOffsets = new ArrayList<>(deviceOffsets); this.currentDeviceIndex = 0; } @@ -76,69 +75,22 @@ public class UnorderedExternalTsFileTableScanOperator extends AbstractTableScanO @Override public void initQueryDataSource(IQueryDataSource dataSource) { super.initQueryDataSource(dataSource); - - QueryDataSource queryDataSource = (QueryDataSource) dataSource; - initDeviceIterator(queryDataSource); currentDeviceEntry = nextDeviceEntry(); recordCurrentDeviceIndex(); constructAlignedSeriesScanUtil(); if (seriesScanUtil != null) { - seriesScanUtil.initQueryDataSource(queryDataSource); + seriesScanUtil.initQueryDataSource((QueryDataSource) dataSource); } } - private void initDeviceIterator(QueryDataSource queryDataSource) { - resourceReaderMap = createResourceReaderMap(getAllResources(queryDataSource)); - deviceIterator = - new MultiTsFileResourceIterator( - tableName, - queryDataSource.getSeqResources(), - queryDataSource.getUnseqResources(), - resourceReaderMap, - ((OperatorContext) operatorContext).getInstanceContext(), - seriesScanOptions, - deviceFilter); - } - - private Map<TsFileResource, TsFileSequenceReader> createResourceReaderMap( - List<TsFileResource> resources) { - Map<TsFileResource, TsFileSequenceReader> readerMap = new HashMap<>(resources.size()); - for (TsFileResource resource : resources) { - try { - readerMap.put( - resource, - new TsFileSequenceReader( - resource.getTsFilePath(), - ((OperatorContext) operatorContext) - .getInstanceContext() - .getQueryStatistics() - .getLoadTimeSeriesMetadataActualIOSize() - ::addAndGet, - EncryptDBUtils.getFirstEncryptParamFromTSFilePath(resource.getTsFilePath()))); - } catch (IOException e) { - closeResourceReaders(readerMap); - throw new RuntimeException( - "Failed to open external TsFile reader: " + resource.getTsFilePath(), e); - } - } - return readerMap; - } - - private List<TsFileResource> getAllResources(QueryDataSource queryDataSource) { - List<TsFileResource> resources = - new ArrayList<>( - queryDataSource.getSeqResources().size() + queryDataSource.getUnseqResources().size()); - resources.addAll(queryDataSource.getSeqResources()); - resources.addAll(queryDataSource.getUnseqResources()); - return resources; - } - private DeviceEntry nextDeviceEntry() { - if (deviceIterator == null || !deviceIterator.hasNextDevice()) { + if (currentDeviceIndex >= deviceEntries.size()) { + currentDeviceOffsets = null; return null; } - IDeviceID nextDevice = deviceIterator.nextDevice(); - return nextDevice == null ? null : new AlignedDeviceEntry(nextDevice, new Binary[0]); + DeviceEntry deviceEntry = deviceEntries.get(currentDeviceIndex); + currentDeviceOffsets = deviceOffsets.get(currentDeviceIndex); + return deviceEntry; } @Override @@ -184,15 +136,38 @@ public class UnorderedExternalTsFileTableScanOperator extends AbstractTableScanO ((OperatorContext) operatorContext).getInstanceContext(), true, measurementColumnTSDataTypes, - deviceIterator); + this::loadTimeSeriesMetadata); } - @Override - public void close() throws Exception { - closeResourceReaders(resourceReaderMap); - resourceReaderMap = Collections.emptyMap(); - deviceIterator = null; - super.close(); + private AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata( + TsFileResource resource, AlignedFullPath alignedPath) throws IOException { + if (currentDeviceOffsets == null + || !getCurrentDeviceEntry().getDeviceID().equals(alignedPath.getDeviceId())) { + return null; + } + + long[] deviceMeasurementNodeOffset = getDeviceMeasurementNodeOffset(resource.getTsFilePath()); + if (deviceMeasurementNodeOffset == null) { + return null; + } + // TODO: Use deviceMeasurementNodeOffset after FileLoaderUtils supports offset-based metadata + // loading in this branch. + return FileLoaderUtils.loadAlignedTimeSeriesMetadata( + resource, + alignedPath, + ((OperatorContext) operatorContext).getInstanceContext(), + seriesScanOptions.getGlobalTimeFilter(), + resource.isSeq(), + ((OperatorContext) operatorContext).getInstanceContext().isIgnoreAllNullRows()); + } + + private long[] getDeviceMeasurementNodeOffset(String tsFilePath) { + for (ExternalTsFileDeviceOffset offset : currentDeviceOffsets) { + if (tsFilePath.equals(offset.getTsFilePath())) { + return offset.getDeviceMeasurementNodeOffset(); + } + } + return null; } @Override @@ -200,16 +175,7 @@ public class UnorderedExternalTsFileTableScanOperator extends AbstractTableScanO return super.ramBytesUsed() + INSTANCE_SIZE - AbstractTableScanOperator.INSTANCE_SIZE - + RamUsageEstimator.sizeOfMap(resourceReaderMap); - } - - private void closeResourceReaders(Map<TsFileResource, TsFileSequenceReader> readerMap) { - for (TsFileSequenceReader reader : readerMap.values()) { - try { - reader.close(); - } catch (IOException ignored) { - // ignore close failure - } - } + + RamUsageEstimator.sizeOfCollection(deviceEntries) + + RamUsageEstimator.sizeOfCollection(deviceOffsets); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java index b1eb554a3ef..215fdd66f33 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java @@ -19,8 +19,11 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; +import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.commons.schema.filter.SchemaFilter; +import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.source.FileLoaderUtils; import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; @@ -28,6 +31,7 @@ import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.EncryptDBUtils; +import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.utils.Binary; @@ -48,17 +52,28 @@ public class UnorderedExternalTsFileTableScanOperator extends AbstractTableScanO private final String tableName; private final SchemaFilter deviceFilter; + private final List<DeviceEntry> deviceEntries; + private final List<List<ExternalTsFileDeviceOffset>> deviceOffsets; + private final ExternalTsFileDeviceFilterVisitor deviceFilterVisitor = + new ExternalTsFileDeviceFilterVisitor(); private MultiTsFileResourceIterator deviceIterator; private Map<TsFileResource, TsFileSequenceReader> resourceReaderMap = Collections.emptyMap(); private DeviceEntry currentDeviceEntry; + private List<ExternalTsFileDeviceOffset> currentDeviceOffsets; private int currentDeviceIndex; public UnorderedExternalTsFileTableScanOperator( - AbstractTableScanOperatorParameter parameter, String tableName, SchemaFilter deviceFilter) { + AbstractTableScanOperatorParameter parameter, + String tableName, + SchemaFilter deviceFilter, + List<DeviceEntry> deviceEntries, + List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { super(parameter); this.tableName = tableName; this.deviceFilter = deviceFilter; + this.deviceEntries = deviceEntries; + this.deviceOffsets = deviceOffsets; this.currentDeviceIndex = 0; } @@ -78,7 +93,9 @@ public class UnorderedExternalTsFileTableScanOperator extends AbstractTableScanO super.initQueryDataSource(dataSource); QueryDataSource queryDataSource = (QueryDataSource) dataSource; - initDeviceIterator(queryDataSource); + if (deviceEntries.isEmpty()) { + initDeviceIterator(queryDataSource); + } currentDeviceEntry = nextDeviceEntry(); recordCurrentDeviceIndex(); constructAlignedSeriesScanUtil(); @@ -134,6 +151,19 @@ public class UnorderedExternalTsFileTableScanOperator extends AbstractTableScanO } private DeviceEntry nextDeviceEntry() { + if (!deviceEntries.isEmpty()) { + while (currentDeviceIndex < deviceEntries.size()) { + currentDeviceOffsets = deviceOffsets.get(currentDeviceIndex); + DeviceEntry deviceEntry = deviceEntries.get(currentDeviceIndex); + if (isDeviceMatched(deviceEntry.getDeviceID())) { + return deviceEntry; + } + currentDeviceIndex++; + } + currentDeviceOffsets = null; + return null; + } + if (deviceIterator == null || !deviceIterator.hasNextDevice()) { return null; } @@ -184,7 +214,42 @@ public class UnorderedExternalTsFileTableScanOperator extends AbstractTableScanO ((OperatorContext) operatorContext).getInstanceContext(), true, measurementColumnTSDataTypes, - deviceIterator); + deviceEntries.isEmpty() + ? deviceIterator::loadTimeSeriesMetadata + : this::loadTimeSeriesMetadata); + } + + private boolean isDeviceMatched(IDeviceID deviceID) { + return deviceFilter == null + || Boolean.TRUE.equals(deviceFilter.accept(deviceFilterVisitor, deviceID)); + } + + private AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata( + TsFileResource resource, AlignedFullPath alignedPath) throws IOException { + if (currentDeviceOffsets == null + || !getCurrentDeviceEntry().getDeviceID().equals(alignedPath.getDeviceId())) { + return null; + } + if (!containsCurrentDevice(resource)) { + return null; + } + return FileLoaderUtils.loadAlignedTimeSeriesMetadata( + resource, + alignedPath, + ((OperatorContext) operatorContext).getInstanceContext(), + seriesScanOptions.getGlobalTimeFilter(), + resource.isSeq(), + ((OperatorContext) operatorContext).getInstanceContext().isIgnoreAllNullRows()); + } + + private boolean containsCurrentDevice(TsFileResource resource) { + String tsFilePath = resource.getTsFilePath(); + for (ExternalTsFileDeviceOffset offset : currentDeviceOffsets) { + if (tsFilePath.equals(offset.getTsFilePath())) { + return true; + } + } + return false; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java index a857e063304..6faca66d7aa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java @@ -51,9 +51,7 @@ import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.FunctionCall import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.LongLiteral; import org.apache.iotdb.commons.queryengine.plan.relational.type.InternalTypeManager; import org.apache.iotdb.commons.queryengine.utils.TimestampPrecisionUtils; -import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.commons.schema.table.TsTable; -import org.apache.iotdb.commons.schema.table.column.TagColumnSchema; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -98,22 +96,20 @@ import org.apache.iotdb.db.queryengine.execution.operator.source.relational.Abst import org.apache.iotdb.db.queryengine.execution.operator.source.relational.CteScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.DefaultAggTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.DeviceIteratorScanOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.ExternalTsFileTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.InformationSchemaTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.LastQueryAggTableScanOperator; -import org.apache.iotdb.db.queryengine.execution.operator.source.relational.OrderedExternalTsFileTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewAggregationScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeNonAlignedDeviceViewAggregationScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeToTableViewAdaptorOperator; -import org.apache.iotdb.db.queryengine.execution.operator.source.relational.UnorderedExternalTsFileTableScanOperator; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.read.CountSchemaMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.ConvertPredicateToTimeFilterVisitor; -import org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.schema.ConvertSchemaPredicateToFilterVisitor; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.metadata.NonAlignedDeviceEntry; @@ -1132,18 +1128,13 @@ public class DataNodeTableOperatorGenerator ExternalTsFileScanNode node, LocalExecutionPlanContext context) { AbstractTableScanOperator.AbstractTableScanOperatorParameter parameter = constructExternalTsFileTableScanOperatorParameter(node, context); - SchemaFilter deviceFilter = constructExternalTsFileDeviceFilter(node); AbstractTableScanOperator externalTsFileTableScanOperator = - node.getPushedOrderingScheme().isPresent() - ? new OrderedExternalTsFileTableScanOperator( - parameter, - node.getQualifiedObjectName().getObjectName(), - node.getAssignments(), - node.getPushedOrderingScheme().get(), - deviceFilter) - : new UnorderedExternalTsFileTableScanOperator( - parameter, node.getQualifiedObjectName().getObjectName(), deviceFilter); + new ExternalTsFileTableScanOperator( + parameter, + node.getQualifiedObjectName().getObjectName(), + node.getDeviceEntries(), + node.getDeviceOffsets()); context.getInstanceContext().collectTable(node.getQualifiedObjectName().getObjectName()); @@ -1156,31 +1147,6 @@ public class DataNodeTableOperatorGenerator return externalTsFileTableScanOperator; } - private SchemaFilter constructExternalTsFileDeviceFilter(ExternalTsFileScanNode node) { - if (!node.getTagPredicate().isPresent()) { - return null; - } - TsTable table = new TsTable(node.getQualifiedObjectName().getObjectName()); - for (Map.Entry<Symbol, ColumnSchema> entry : node.getAssignments().entrySet()) { - ColumnSchema columnSchema = entry.getValue(); - if (columnSchema.getColumnCategory() == TsTableColumnCategory.TAG) { - table.addColumnSchema( - new TagColumnSchema(entry.getKey().getName(), getTSDataType(columnSchema.getType()))); - } - } - SchemaFilter deviceFilter = - node.getTagPredicate() - .get() - .accept( - new ConvertSchemaPredicateToFilterVisitor(), - new ConvertSchemaPredicateToFilterVisitor.Context(table)); - if (deviceFilter == null) { - throw new UnsupportedOperationException( - "Unsupported external TsFile device filter: " + node.getTagPredicate().get()); - } - return deviceFilter; - } - private AbstractTableScanOperator.AbstractTableScanOperatorParameter constructExternalTsFileTableScanOperatorParameter( ExternalTsFileScanNode node, LocalExecutionPlanContext context) { @@ -1202,11 +1168,7 @@ public class DataNodeTableOperatorGenerator OperatorContext operatorContext = addOperatorContext( - context, - node.getPlanNodeId(), - node.getPushedOrderingScheme().isPresent() - ? OrderedExternalTsFileTableScanOperator.class.getSimpleName() - : UnorderedExternalTsFileTableScanOperator.class.getSimpleName()); + context, node.getPlanNodeId(), ExternalTsFileTableScanOperator.class.getSimpleName()); Set<String> allSensors = new HashSet<>(commonParameter.measurementColumnNames); // for time column diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index 4338fe8ad43..3ab8b283d0b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -684,18 +684,10 @@ public class PlanGraphPrinter implements PlanVisitor<List<String>, PlanGraphPrin if (externalTsFileScanNode != null) { boxValue.add(String.format("ScanOrder: %s", externalTsFileScanNode.getScanOrder())); boxValue.add(String.format("TsFilePaths: %s", externalTsFileScanNode.getTsFilePaths())); - externalTsFileScanNode - .getPushedOrderingScheme() - .ifPresent( - orderingScheme -> - boxValue.add(String.format("PushedOrderingScheme: %s", orderingScheme))); externalTsFileScanNode .getTimePredicate() .ifPresent( timePredicate -> boxValue.add(String.format("TimePredicate: %s", timePredicate))); - externalTsFileScanNode - .getTagPredicate() - .ifPresent(tagPredicate -> boxValue.add(String.format("TagPredicate: %s", tagPredicate))); } if (node.getPushDownPredicate() != null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 6c54af74445..d6cc700a14f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -73,6 +73,7 @@ import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.SymbolRefere import org.apache.iotdb.commons.schema.table.InformationSchema; import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; +import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -731,16 +732,74 @@ public class TableDistributedPlanGenerator @Override public List<PlanNode> visitExternalTsFileScan( final ExternalTsFileScanNode node, final PlanContext context) { - node.setRegionReplicaSet( - new TRegionReplicaSet( - null, ImmutableList.of(DataNodeEndPoints.getLocalDataNodeLocation()))); + TRegionReplicaSet localRegionReplicaSet = + new TRegionReplicaSet(null, ImmutableList.of(DataNodeEndPoints.getLocalDataNodeLocation())); + node.setRegionReplicaSet(localRegionReplicaSet); context.mostUsedRegion = node.getRegionReplicaSet(); if (context.hasSortProperty) { processExternalTsFileSortProperty(node, context); + return Collections.singletonList(node); + } + + List<PlanNode> splitNodes = splitExternalTsFileScanByDeviceEntries(node, localRegionReplicaSet); + if (!splitNodes.isEmpty()) { + return splitNodes; } return Collections.singletonList(node); } + private List<PlanNode> splitExternalTsFileScanByDeviceEntries( + final ExternalTsFileScanNode node, final TRegionReplicaSet localRegionReplicaSet) { + List<DeviceEntry> deviceEntries = node.getDeviceEntries(); + if (deviceEntries.size() <= 1) { + return Collections.emptyList(); + } + + int splitCount = + Math.min( + deviceEntries.size(), + IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism()); + if (splitCount <= 1) { + return Collections.emptyList(); + } + + List<List<DeviceEntry>> splitDeviceEntries = new ArrayList<>(splitCount); + List<List<List<ExternalTsFileDeviceOffset>>> splitDeviceOffsets = new ArrayList<>(splitCount); + for (int i = 0; i < splitCount; i++) { + splitDeviceEntries.add(new ArrayList<>()); + splitDeviceOffsets.add(new ArrayList<>()); + } + for (int i = 0; i < deviceEntries.size(); i++) { + splitDeviceEntries.get(i % splitCount).add(deviceEntries.get(i)); + splitDeviceOffsets.get(i % splitCount).add(node.getDeviceOffsets().get(i)); + } + + List<PlanNode> result = new ArrayList<>(splitCount); + for (int i = 0; i < splitDeviceEntries.size(); i++) { + List<DeviceEntry> entries = splitDeviceEntries.get(i); + if (entries.isEmpty()) { + continue; + } + ExternalTsFileScanNode splitNode = + new ExternalTsFileScanNode( + queryId.genPlanNodeId(), + node.getQualifiedObjectName(), + node.getOutputSymbols(), + node.getAssignments(), + node.getPushDownPredicate(), + node.getPushDownLimit(), + node.getPushDownOffset(), + node.getTimePredicate().orElse(null), + node.getScanOrder(), + node.getTsFilePaths(), + entries, + splitDeviceOffsets.get(i)); + splitNode.setRegionReplicaSet(localRegionReplicaSet); + result.add(splitNode); + } + return result; + } + private List<PlanNode> constructDeviceTableScanByTags( final DeviceTableScanNode node, final PlanContext context) { DataPartition dataPartition = analysis.getDataPartitionInfo(); @@ -1969,13 +2028,32 @@ public class TableDistributedPlanGenerator return; } - OrderingScheme pushedOrderingScheme = + OrderingScheme orderingScheme = new OrderingScheme( newOrderingSymbols, IntStream.range(0, newOrderingSymbols.size()) .boxed() .collect(Collectors.toMap(newOrderingSymbols::get, newSortOrders::get))); - externalTsFileScanNode.setPushedOrderingScheme(pushedOrderingScheme); + + Comparator<DeviceEntry> comparator = + createExternalTsFileDeviceEntryComparator( + externalTsFileScanNode, newOrderingSymbols, newSortOrders); + if (comparator != null) { + Map<IDeviceID, List<ExternalTsFileDeviceOffset>> offsetMap = new HashMap<>(); + List<DeviceEntry> deviceEntries = externalTsFileScanNode.getDeviceEntries(); + List<List<ExternalTsFileDeviceOffset>> deviceOffsets = + externalTsFileScanNode.getDeviceOffsets(); + for (int i = 0; i < deviceEntries.size(); i++) { + offsetMap.put(deviceEntries.get(i).getDeviceID(), deviceOffsets.get(i)); + } + externalTsFileScanNode.getDeviceEntries().sort(comparator); + List<List<ExternalTsFileDeviceOffset>> sortedDeviceOffsets = + new ArrayList<>(externalTsFileScanNode.getDeviceEntries().size()); + for (DeviceEntry deviceEntry : externalTsFileScanNode.getDeviceEntries()) { + sortedDeviceOffsets.add(offsetMap.get(deviceEntry.getDeviceID())); + } + externalTsFileScanNode.setDeviceOffsets(sortedDeviceOffsets); + } if (lastIsTimeRelated) { if (newOrderingSymbols.size() > 1 @@ -1989,14 +2067,66 @@ public class TableDistributedPlanGenerator .boxed() .collect(Collectors.toMap(newOrderingSymbols::get, newSortOrders::get))), newOrderingSymbols.size() - 2)) { - nodeOrderingMap.put(externalTsFileScanNode.getPlanNodeId(), pushedOrderingScheme); + nodeOrderingMap.put(externalTsFileScanNode.getPlanNodeId(), orderingScheme); } return; } if (newOrderingSymbols.size() == expectedOrderingScheme.getOrderBy().size()) { - nodeOrderingMap.put(externalTsFileScanNode.getPlanNodeId(), pushedOrderingScheme); + nodeOrderingMap.put(externalTsFileScanNode.getPlanNodeId(), orderingScheme); + } + } + + private Comparator<DeviceEntry> createExternalTsFileDeviceEntryComparator( + ExternalTsFileScanNode externalTsFileScanNode, + List<Symbol> orderingSymbols, + List<SortOrder> sortOrders) { + Comparator<DeviceEntry> comparator = null; + for (int i = 0; i < orderingSymbols.size(); i++) { + Symbol symbol = orderingSymbols.get(i); + if (externalTsFileScanNode.isTimeColumn(symbol)) { + break; + } + int tagIndex = getExternalTsFileTagIndex(externalTsFileScanNode, symbol); + final int deviceSegmentIndex = tagIndex + 1; + SortOrder sortOrder = sortOrders.get(i); + Comparator<String> valueComparator = + sortOrder.isNullsFirst() + ? Comparator.nullsFirst(Comparator.naturalOrder()) + : Comparator.nullsLast(Comparator.naturalOrder()); + Comparator<DeviceEntry> currentComparator = + Comparator.comparing( + deviceEntry -> getDeviceSegment(deviceEntry.getDeviceID(), deviceSegmentIndex), + valueComparator); + if (!sortOrder.isAscending()) { + currentComparator = currentComparator.reversed(); + } + comparator = + comparator == null ? currentComparator : comparator.thenComparing(currentComparator); } + return comparator == null ? null : comparator.thenComparing(DeviceEntry::getDeviceID); + } + + private int getExternalTsFileTagIndex( + ExternalTsFileScanNode externalTsFileScanNode, Symbol symbol) { + int tagIndex = 0; + for (Map.Entry<Symbol, ColumnSchema> entry : + externalTsFileScanNode.getAssignments().entrySet()) { + if (entry.getValue().getColumnCategory() != TsTableColumnCategory.TAG) { + continue; + } + if (entry.getKey().equals(symbol)) { + return tagIndex; + } + tagIndex++; + } + throw new IllegalArgumentException("Unexpected external TsFile ordering symbol: " + symbol); + } + + private String getDeviceSegment(IDeviceID deviceID, int deviceSegmentIndex) { + return deviceSegmentIndex < deviceID.segmentNum() + ? (String) deviceID.segment(deviceSegmentIndex) + : null; } private Optional<IDeviceID.TreeDeviceIdColumnValueExtractor> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java index 5a7ab6eadfa..42c1c8025a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java @@ -188,11 +188,11 @@ public class PruneTableScanColumns extends ProjectOffPushDownRule<TableScanNode> externalTsFileScanNode.getPushDownPredicate(), externalTsFileScanNode.getPushDownLimit(), externalTsFileScanNode.getPushDownOffset(), - externalTsFileScanNode.getTagPredicate().orElse(null), externalTsFileScanNode.getTimePredicate().orElse(null), externalTsFileScanNode.getScanOrder(), - externalTsFileScanNode.getPushedOrderingScheme().orElse(null), - externalTsFileScanNode.getTsFilePaths())); + externalTsFileScanNode.getTsFilePaths(), + externalTsFileScanNode.getDeviceEntries(), + externalTsFileScanNode.getDeviceOffsets())); } else if (node instanceof InformationSchemaTableScanNode) { // For the convenience of process in execution stage, column-prune for // InformationSchemaTableScanNode is diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java index e60cebfdf29..71184f21eed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java @@ -21,18 +21,16 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.node; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.IPlanVisitor; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; -import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.TableScanNode; import org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName; -import org.apache.iotdb.commons.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; -import org.apache.tsfile.utils.ReadWriteIOUtils; - import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -44,10 +42,10 @@ import java.util.Optional; public class ExternalTsFileScanNode extends TableScanNode { private List<String> tsFilePaths; - private Expression tagPredicate; private Expression timePredicate; private Ordering scanOrder = Ordering.ASC; - private OrderingScheme pushedOrderingScheme; + private List<DeviceEntry> deviceEntries = Collections.emptyList(); + private List<List<ExternalTsFileDeviceOffset>> deviceOffsets = Collections.emptyList(); protected ExternalTsFileScanNode() {} @@ -57,8 +55,28 @@ public class ExternalTsFileScanNode extends TableScanNode { List<Symbol> outputSymbols, Map<Symbol, ColumnSchema> assignments, List<String> tsFilePaths) { + this( + id, + qualifiedObjectName, + outputSymbols, + assignments, + tsFilePaths, + Collections.emptyList(), + Collections.emptyList()); + } + + public ExternalTsFileScanNode( + PlanNodeId id, + QualifiedObjectName qualifiedObjectName, + List<Symbol> outputSymbols, + Map<Symbol, ColumnSchema> assignments, + List<String> tsFilePaths, + List<DeviceEntry> deviceEntries, + List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { super(id, qualifiedObjectName, outputSymbols, assignments); this.tsFilePaths = Collections.unmodifiableList(new ArrayList<>(tsFilePaths)); + this.deviceEntries = new ArrayList<>(deviceEntries); + this.deviceOffsets = copyDeviceOffsets(deviceOffsets); } public ExternalTsFileScanNode( @@ -70,7 +88,6 @@ public class ExternalTsFileScanNode extends TableScanNode { long pushDownLimit, long pushDownOffset, Ordering scanOrder, - OrderingScheme pushedOrderingScheme, List<String> tsFilePaths) { this( id, @@ -80,11 +97,35 @@ public class ExternalTsFileScanNode extends TableScanNode { pushDownPredicate, pushDownLimit, pushDownOffset, - null, + scanOrder, + tsFilePaths, + Collections.emptyList()); + } + + public ExternalTsFileScanNode( + PlanNodeId id, + QualifiedObjectName qualifiedObjectName, + List<Symbol> outputSymbols, + Map<Symbol, ColumnSchema> assignments, + Expression pushDownPredicate, + long pushDownLimit, + long pushDownOffset, + Ordering scanOrder, + List<String> tsFilePaths, + List<DeviceEntry> deviceEntries) { + this( + id, + qualifiedObjectName, + outputSymbols, + assignments, + pushDownPredicate, + pushDownLimit, + pushDownOffset, null, scanOrder, - pushedOrderingScheme, - tsFilePaths); + tsFilePaths, + deviceEntries, + Collections.emptyList()); } public ExternalTsFileScanNode( @@ -95,11 +136,37 @@ public class ExternalTsFileScanNode extends TableScanNode { Expression pushDownPredicate, long pushDownLimit, long pushDownOffset, - Expression tagPredicate, Expression timePredicate, Ordering scanOrder, - OrderingScheme pushedOrderingScheme, List<String> tsFilePaths) { + this( + id, + qualifiedObjectName, + outputSymbols, + assignments, + pushDownPredicate, + pushDownLimit, + pushDownOffset, + timePredicate, + scanOrder, + tsFilePaths, + Collections.emptyList(), + Collections.emptyList()); + } + + public ExternalTsFileScanNode( + PlanNodeId id, + QualifiedObjectName qualifiedObjectName, + List<Symbol> outputSymbols, + Map<Symbol, ColumnSchema> assignments, + Expression pushDownPredicate, + long pushDownLimit, + long pushDownOffset, + Expression timePredicate, + Ordering scanOrder, + List<String> tsFilePaths, + List<DeviceEntry> deviceEntries, + List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { super( id, qualifiedObjectName, @@ -108,11 +175,11 @@ public class ExternalTsFileScanNode extends TableScanNode { pushDownPredicate, pushDownLimit, pushDownOffset); - this.tagPredicate = tagPredicate; this.timePredicate = timePredicate; this.scanOrder = scanOrder; - this.pushedOrderingScheme = pushedOrderingScheme; this.tsFilePaths = Collections.unmodifiableList(new ArrayList<>(tsFilePaths)); + this.deviceEntries = new ArrayList<>(deviceEntries); + this.deviceOffsets = copyDeviceOffsets(deviceOffsets); } @Override @@ -130,23 +197,41 @@ public class ExternalTsFileScanNode extends TableScanNode { pushDownPredicate, pushDownLimit, pushDownOffset, - tagPredicate, timePredicate, scanOrder, - pushedOrderingScheme, - tsFilePaths); + tsFilePaths, + deviceEntries, + deviceOffsets); } public List<String> getTsFilePaths() { return tsFilePaths; } - public Optional<Expression> getTagPredicate() { - return Optional.ofNullable(tagPredicate); + public List<DeviceEntry> getDeviceEntries() { + return deviceEntries; } - public void setTagPredicate(Expression tagPredicate) { - this.tagPredicate = tagPredicate; + public void setDeviceEntries(List<DeviceEntry> deviceEntries) { + this.deviceEntries = new ArrayList<>(deviceEntries); + } + + public List<List<ExternalTsFileDeviceOffset>> getDeviceOffsets() { + return deviceOffsets; + } + + public void setDeviceOffsets(List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { + this.deviceOffsets = copyDeviceOffsets(deviceOffsets); + } + + private static List<List<ExternalTsFileDeviceOffset>> copyDeviceOffsets( + List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { + List<List<ExternalTsFileDeviceOffset>> copiedDeviceOffsets = + new ArrayList<>(deviceOffsets.size()); + for (List<ExternalTsFileDeviceOffset> offsets : deviceOffsets) { + copiedDeviceOffsets.add(new ArrayList<>(offsets)); + } + return copiedDeviceOffsets; } public Optional<Expression> getTimePredicate() { @@ -165,104 +250,21 @@ public class ExternalTsFileScanNode extends TableScanNode { this.scanOrder = scanOrder; } - public Optional<OrderingScheme> getPushedOrderingScheme() { - return Optional.ofNullable(pushedOrderingScheme); - } - - public void setPushedOrderingScheme(OrderingScheme pushedOrderingScheme) { - this.pushedOrderingScheme = pushedOrderingScheme; - } - @Override protected void serializeAttributes(ByteBuffer byteBuffer) { - PlanNodeType.EXTERNAL_TSFILE_SCAN_NODE.serialize(byteBuffer); - TableScanNode.serializeMemberVariables(this, byteBuffer, true); - serializePredicate(tagPredicate, byteBuffer); - serializePredicate(timePredicate, byteBuffer); - ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer); - if (pushedOrderingScheme == null) { - ReadWriteIOUtils.write(false, byteBuffer); - } else { - ReadWriteIOUtils.write(true, byteBuffer); - pushedOrderingScheme.serialize(byteBuffer); - } - serializeTsFilePaths(byteBuffer); + throw new UnsupportedOperationException( + "ExternalTsFileScanNode cannot be serialized because it reads local external TsFiles"); } @Override protected void serializeAttributes(DataOutputStream stream) throws IOException { - PlanNodeType.EXTERNAL_TSFILE_SCAN_NODE.serialize(stream); - TableScanNode.serializeMemberVariables(this, stream, true); - serializePredicate(tagPredicate, stream); - serializePredicate(timePredicate, stream); - ReadWriteIOUtils.write(scanOrder.ordinal(), stream); - if (pushedOrderingScheme == null) { - ReadWriteIOUtils.write(false, stream); - } else { - ReadWriteIOUtils.write(true, stream); - pushedOrderingScheme.serialize(stream); - } - serializeTsFilePaths(stream); - } - - private void serializeTsFilePaths(ByteBuffer byteBuffer) { - ReadWriteIOUtils.write(tsFilePaths.size(), byteBuffer); - for (String tsFilePath : tsFilePaths) { - ReadWriteIOUtils.write(tsFilePath, byteBuffer); - } - } - - private void serializeTsFilePaths(DataOutputStream stream) throws IOException { - ReadWriteIOUtils.write(tsFilePaths.size(), stream); - for (String tsFilePath : tsFilePaths) { - ReadWriteIOUtils.write(tsFilePath, stream); - } - } - - private void serializePredicate(Expression predicate, ByteBuffer byteBuffer) { - if (predicate == null) { - ReadWriteIOUtils.write(false, byteBuffer); - } else { - ReadWriteIOUtils.write(true, byteBuffer); - Expression.serialize(predicate, byteBuffer); - } - } - - private void serializePredicate(Expression predicate, DataOutputStream stream) - throws IOException { - if (predicate == null) { - ReadWriteIOUtils.write(false, stream); - } else { - ReadWriteIOUtils.write(true, stream); - Expression.serialize(predicate, stream); - } + throw new UnsupportedOperationException( + "ExternalTsFileScanNode cannot be serialized because it reads local external TsFiles"); } public static ExternalTsFileScanNode deserialize(ByteBuffer byteBuffer) { - ExternalTsFileScanNode node = new ExternalTsFileScanNode(); - TableScanNode.deserializeMemberVariables(byteBuffer, node, true); - - if (ReadWriteIOUtils.readBool(byteBuffer)) { - node.tagPredicate = Expression.deserialize(byteBuffer); - } - if (ReadWriteIOUtils.readBool(byteBuffer)) { - node.timePredicate = Expression.deserialize(byteBuffer); - } - - node.scanOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)]; - if (ReadWriteIOUtils.readBool(byteBuffer)) { - node.pushedOrderingScheme = OrderingScheme.deserialize(byteBuffer); - } - - int size = ReadWriteIOUtils.readInt(byteBuffer); - List<String> tsFilePaths = new ArrayList<>(size); - while (size-- > 0) { - tsFilePaths.add(ReadWriteIOUtils.readString(byteBuffer)); - } - node.tsFilePaths = Collections.unmodifiableList(tsFilePaths); - - node.setPlanNodeId(PlanNodeId.deserialize(byteBuffer)); - return node; + throw new UnsupportedOperationException( + "ExternalTsFileScanNode cannot be deserialized because it reads local external TsFiles"); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java index 361e2d540bb..05df24c0cd6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java @@ -47,12 +47,18 @@ import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Node; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.NullLiteral; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.StringLiteral; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.SymbolReference; +import org.apache.iotdb.commons.queryengine.plan.relational.type.InternalTypeManager; import org.apache.iotdb.commons.queryengine.utils.TimestampPrecisionUtils; +import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.commons.schema.table.InformationSchema; +import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.commons.schema.table.column.TagColumnSchema; +import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.ExternalTsFileDeviceFilterVisitor; import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; @@ -61,6 +67,8 @@ import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.ConvertPredicateToTimeFilterVisitor; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.PredicateCombineIntoTableScanChecker; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.PredicatePushIntoMetadataChecker; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.schema.ConvertSchemaPredicateToFilterVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.metadata.NonAlignedDeviceEntry; @@ -79,18 +87,24 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceVi import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.LazyTsFileDeviceIterator; +import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.common.type.Type; import org.apache.tsfile.read.filter.basic.Filter; +import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; import javax.annotation.Nullable; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -118,6 +132,7 @@ import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.STATE_ import static org.apache.iotdb.commons.schema.table.InformationSchema.CURRENT_QUERIES; import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.ATTRIBUTE; import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.FIELD; +import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TAG; import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TIME; import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.PARTITION_FETCHER; import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.SCHEMA_FETCHER; @@ -476,6 +491,7 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { public PlanNode visitExternalTsFileScan( ExternalTsFileScanNode tableScanNode, RewriteContext context) { if (TRUE_LITERAL.equals(context.inheritedPredicate)) { + collectExternalTsFileDeviceTasks(tableScanNode, Collections.emptyList()); return tableScanNode; } @@ -524,11 +540,8 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { getDeviceEntriesWithDataPartitions( (DeviceTableScanNode) tableScanNode, splitExpression.getMetadataExpressions()); } else if (tableScanNode instanceof ExternalTsFileScanNode) { - ((ExternalTsFileScanNode) tableScanNode) - .setTagPredicate( - splitExpression.getMetadataExpressions().isEmpty() - ? null - : combineConjuncts(splitExpression.getMetadataExpressions())); + collectExternalTsFileDeviceTasks( + (ExternalTsFileScanNode) tableScanNode, splitExpression.getMetadataExpressions()); } // exist expressions can not push down to scan operator @@ -545,6 +558,77 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { return tableScanNode; } + private void collectExternalTsFileDeviceTasks( + ExternalTsFileScanNode tableScanNode, List<Expression> metadataExpressions) { + SchemaFilter deviceFilter = + constructExternalTsFileDeviceFilter(tableScanNode, metadataExpressions); + ExternalTsFileDeviceFilterVisitor deviceFilterVisitor = + new ExternalTsFileDeviceFilterVisitor(); + Map<IDeviceID, List<ExternalTsFileDeviceOffset>> deviceOffsetMap = new LinkedHashMap<>(); + for (String tsFilePath : tableScanNode.getTsFilePaths()) { + try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFilePath)) { + LazyTsFileDeviceIterator deviceIterator = + new LazyTsFileDeviceIterator( + reader, tableScanNode.getQualifiedObjectName().getObjectName(), ignored -> {}); + while (deviceIterator.hasNext()) { + IDeviceID deviceID = deviceIterator.next(); + if (deviceFilter != null + && !Boolean.TRUE.equals(deviceFilter.accept(deviceFilterVisitor, deviceID))) { + continue; + } + deviceOffsetMap + .computeIfAbsent(deviceID, ignored -> new ArrayList<>()) + .add( + new ExternalTsFileDeviceOffset( + tsFilePath, deviceIterator.getCurrentDeviceMeasurementNodeOffset())); + } + } catch (IOException e) { + throw new SemanticException( + "Failed to collect devices from external TsFile: " + tsFilePath); + } + } + List<DeviceEntry> deviceEntries = new ArrayList<>(deviceOffsetMap.size()); + List<List<ExternalTsFileDeviceOffset>> deviceOffsets = + new ArrayList<>(deviceOffsetMap.size()); + for (Map.Entry<IDeviceID, List<ExternalTsFileDeviceOffset>> entry : + deviceOffsetMap.entrySet()) { + deviceEntries.add(new AlignedDeviceEntry(entry.getKey(), new Binary[0])); + deviceOffsets.add(entry.getValue()); + } + tableScanNode.setDeviceEntries(deviceEntries); + tableScanNode.setDeviceOffsets(deviceOffsets); + } + + private SchemaFilter constructExternalTsFileDeviceFilter( + ExternalTsFileScanNode tableScanNode, List<Expression> metadataExpressions) { + if (metadataExpressions.isEmpty()) { + return null; + } + TsTable table = new TsTable(tableScanNode.getQualifiedObjectName().getObjectName()); + for (Map.Entry<Symbol, ColumnSchema> entry : tableScanNode.getAssignments().entrySet()) { + ColumnSchema columnSchema = entry.getValue(); + if (TAG.equals(columnSchema.getColumnCategory())) { + table.addColumnSchema( + new TagColumnSchema( + entry.getKey().getName(), + InternalTypeManager.getTSDataType(columnSchema.getType()))); + } + } + Expression predicate = + metadataExpressions.size() == 1 + ? metadataExpressions.get(0) + : new LogicalExpression(LogicalExpression.Operator.AND, metadataExpressions); + SchemaFilter deviceFilter = + predicate.accept( + new ConvertSchemaPredicateToFilterVisitor(), + new ConvertSchemaPredicateToFilterVisitor.Context(table)); + if (deviceFilter == null) { + throw new UnsupportedOperationException( + "Unsupported external TsFile device filter: " + predicate); + } + return deviceFilter; + } + interface InformationSchemaTablePredicatePushDownChecker { boolean canPushDown(Expression expression); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index 389a91f300b..c2f98dd0119 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@ -296,11 +296,11 @@ public class UnaliasSymbolReferences implements PlanOptimizer { node.getPushDownPredicate() == null ? null : mapper.map(node.getPushDownPredicate()), node.getPushDownLimit(), node.getPushDownOffset(), - node.getTagPredicate().map(mapper::map).orElse(null), node.getTimePredicate().map(mapper::map).orElse(null), node.getScanOrder(), - node.getPushedOrderingScheme().map(mapper::map).orElse(null), - node.getTsFilePaths()), + node.getTsFilePaths(), + node.getDeviceEntries(), + node.getDeviceOffsets()), mapping); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/ReadTsFileTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/ReadTsFileTableFunction.java index 82c4133b339..d2f92f6e287 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/ReadTsFileTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/ReadTsFileTableFunction.java @@ -30,12 +30,10 @@ import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; -import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionLeafProcessor; import org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification; import org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification; import org.apache.iotdb.udf.api.type.Type; -import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; @@ -116,13 +114,8 @@ public class ReadTsFileTableFunction implements TableFunction { @Override public TableFunctionProcessorProvider getProcessorProvider( TableFunctionHandle tableFunctionHandle) { - ReadTsFileTableFunctionHandle handle = (ReadTsFileTableFunctionHandle) tableFunctionHandle; - return new TableFunctionProcessorProvider() { - @Override - public TableFunctionLeafProcessor getSplitProcessor() { - return new ReadTsFileLeafProcessor(handle); - } - }; + throw new UnsupportedOperationException( + "read_tsfile must be planned as an ExternalTsFileScanNode"); } private static String getRequiredStringArgument(Map<String, Argument> arguments, String name) { @@ -321,33 +314,27 @@ public class ReadTsFileTableFunction implements TableFunction { return builder.build(); } - private static class ReadTsFileLeafProcessor implements TableFunctionLeafProcessor { - private final ReadTsFileTableFunctionHandle handle; - private boolean finished; - - private ReadTsFileLeafProcessor(ReadTsFileTableFunctionHandle handle) { - this.handle = handle; - } + public static class ExternalTsFileDeviceOffset { - @Override - public void beforeStart() { - // TODO: Open TsFile resources here. - finished = true; - } + private final String tsFilePath; + private final long[] deviceMeasurementNodeOffset; - @Override - public void process(List<ColumnBuilder> columnBuilders) { - // TODO: Read one batch from TsFile resources and write values into column builders. + public ExternalTsFileDeviceOffset(String tsFilePath, long[] deviceMeasurementNodeOffset) { + this.tsFilePath = tsFilePath; + this.deviceMeasurementNodeOffset = + deviceMeasurementNodeOffset == null + ? null + : Arrays.copyOf(deviceMeasurementNodeOffset, deviceMeasurementNodeOffset.length); } - @Override - public boolean isFinish() { - return finished; + public String getTsFilePath() { + return tsFilePath; } - @Override - public void beforeDestroy() { - // TODO: Close TsFile resources here. + public long[] getDeviceMeasurementNodeOffset() { + return deviceMeasurementNodeOffset == null + ? null + : Arrays.copyOf(deviceMeasurementNodeOffset, deviceMeasurementNodeOffset.length); } }
