This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch read_tsfile_table_function in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 66bcfbf2008c445a64082b930f7b53465499bdda Author: shuwenwei <[email protected]> AuthorDate: Fri Jun 5 11:27:52 2026 +0800 fix --- .../ExternalTsFileTableScanOperator.java | 74 ++-------- .../planner/DataNodeTableOperatorGenerator.java | 69 +--------- .../distribute/TableDistributedPlanGenerator.java | 151 ++------------------- .../planner/node/DeviceTableScanNode.java | 5 + .../planner/node/ExternalTsFileScanNode.java | 23 ++++ 5 files changed, 56 insertions(+), 266 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java index 0c68ec96c9b..daccb199d28 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java @@ -24,8 +24,6 @@ import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFuncti import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.source.FileLoaderUtils; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; -import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; -import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata; @@ -35,30 +33,26 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import static org.apache.iotdb.calc.plan.planner.CommonOperatorUtils.CURRENT_DEVICE_INDEX_STRING; - -public class ExternalTsFileTableScanOperator extends AbstractTableScanOperator { +public class ExternalTsFileTableScanOperator extends TableScanOperator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(ExternalTsFileTableScanOperator.class); + private static final long ABSTRACT_DEVICE_TABLE_SCAN_OPERATOR_INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(AbstractDeviceTableScanOperator.class); private final String tableName; - private final List<DeviceEntry> deviceEntries; private final List<List<ExternalTsFileDeviceOffset>> deviceOffsets; - private DeviceEntry currentDeviceEntry; - private List<ExternalTsFileDeviceOffset> currentDeviceOffsets; - private int currentDeviceIndex; - public ExternalTsFileTableScanOperator( AbstractTableScanOperatorParameter parameter, String tableName, - List<DeviceEntry> deviceEntries, List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { super(parameter); this.tableName = tableName; - this.deviceEntries = new ArrayList<>(deviceEntries); this.deviceOffsets = new ArrayList<>(deviceOffsets); - this.currentDeviceIndex = 0; + if (deviceCount != this.deviceOffsets.size()) { + throw new IllegalArgumentException( + "The size of external TsFile device offsets should be equal to device entries"); + } } @Override @@ -72,50 +66,6 @@ public class ExternalTsFileTableScanOperator extends AbstractTableScanOperator { return segment == null ? null : (String) segment; } - @Override - public void initQueryDataSource(IQueryDataSource dataSource) { - super.initQueryDataSource(dataSource); - currentDeviceEntry = nextDeviceEntry(); - recordCurrentDeviceIndex(); - constructAlignedSeriesScanUtil(); - if (seriesScanUtil != null) { - seriesScanUtil.initQueryDataSource((QueryDataSource) dataSource); - } - } - - private DeviceEntry nextDeviceEntry() { - if (currentDeviceIndex >= deviceEntries.size()) { - currentDeviceOffsets = null; - return null; - } - DeviceEntry deviceEntry = deviceEntries.get(currentDeviceIndex); - currentDeviceOffsets = deviceOffsets.get(currentDeviceIndex); - return deviceEntry; - } - - @Override - protected boolean hasCurrentDeviceEntry() { - return currentDeviceEntry != null; - } - - @Override - protected DeviceEntry getCurrentDeviceEntry() { - return currentDeviceEntry; - } - - @Override - protected boolean advanceDeviceEntry() { - currentDeviceIndex++; - currentDeviceEntry = nextDeviceEntry(); - return currentDeviceEntry != null; - } - - @Override - protected void recordCurrentDeviceIndex() { - operatorContext.recordSpecifiedInfo( - CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex)); - } - @Override protected void constructAlignedSeriesScanUtil() { if (!hasCurrentDeviceEntry()) { @@ -141,12 +91,14 @@ public class ExternalTsFileTableScanOperator extends AbstractTableScanOperator { private AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata( TsFileResource resource, AlignedFullPath alignedPath) throws IOException { + List<ExternalTsFileDeviceOffset> currentDeviceOffsets = deviceOffsets.get(currentDeviceIndex); if (currentDeviceOffsets == null || !getCurrentDeviceEntry().getDeviceID().equals(alignedPath.getDeviceId())) { return null; } - long[] deviceMeasurementNodeOffset = getDeviceMeasurementNodeOffset(resource.getTsFilePath()); + long[] deviceMeasurementNodeOffset = + getDeviceMeasurementNodeOffset(currentDeviceOffsets, resource.getTsFilePath()); if (deviceMeasurementNodeOffset == null) { return null; } @@ -161,7 +113,8 @@ public class ExternalTsFileTableScanOperator extends AbstractTableScanOperator { ((OperatorContext) operatorContext).getInstanceContext().isIgnoreAllNullRows()); } - private long[] getDeviceMeasurementNodeOffset(String tsFilePath) { + private long[] getDeviceMeasurementNodeOffset( + List<ExternalTsFileDeviceOffset> currentDeviceOffsets, String tsFilePath) { for (ExternalTsFileDeviceOffset offset : currentDeviceOffsets) { if (tsFilePath.equals(offset.getTsFilePath())) { return offset.getDeviceMeasurementNodeOffset(); @@ -174,8 +127,7 @@ public class ExternalTsFileTableScanOperator extends AbstractTableScanOperator { public long ramBytesUsed() { return super.ramBytesUsed() + INSTANCE_SIZE - - AbstractTableScanOperator.INSTANCE_SIZE - + RamUsageEstimator.sizeOfCollection(deviceEntries) + - ABSTRACT_DEVICE_TABLE_SCAN_OPERATOR_INSTANCE_SIZE + RamUsageEstimator.sizeOfCollection(deviceOffsets); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java index 6faca66d7aa..5ceeef9f86b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java @@ -1127,14 +1127,16 @@ public class DataNodeTableOperatorGenerator public Operator visitExternalTsFileScan( ExternalTsFileScanNode node, LocalExecutionPlanContext context) { AbstractTableScanOperator.AbstractTableScanOperatorParameter parameter = - constructExternalTsFileTableScanOperatorParameter(node, context); + constructAbstractTableScanOperatorParameter( + node, + context, + ExternalTsFileTableScanOperator.class.getSimpleName(), + Collections.emptyMap(), + Long.MAX_VALUE); AbstractTableScanOperator externalTsFileTableScanOperator = new ExternalTsFileTableScanOperator( - parameter, - node.getQualifiedObjectName().getObjectName(), - node.getDeviceEntries(), - node.getDeviceOffsets()); + parameter, node.getQualifiedObjectName().getObjectName(), node.getDeviceOffsets()); context.getInstanceContext().collectTable(node.getQualifiedObjectName().getObjectName()); @@ -1147,63 +1149,6 @@ public class DataNodeTableOperatorGenerator return externalTsFileTableScanOperator; } - private AbstractTableScanOperator.AbstractTableScanOperatorParameter - constructExternalTsFileTableScanOperatorParameter( - ExternalTsFileScanNode node, LocalExecutionPlanContext context) { - CommonTableScanOperatorParameters commonParameter = - new CommonTableScanOperatorParameters( - node, Collections.emptyMap(), false, buildTagAndAttributeColumnsIndexMap(node)); - SeriesScanOptions seriesScanOptions = - buildSeriesScanOptions( - context, - commonParameter.columnSchemaMap, - commonParameter.measurementColumnNames, - commonParameter.measurementColumnsIndexMap, - commonParameter.timeColumnName, - node.getTimePredicate(), - node.getPushDownLimit(), - node.getPushDownOffset(), - false, - node.getPushDownPredicate()); - - OperatorContext operatorContext = - addOperatorContext( - context, node.getPlanNodeId(), ExternalTsFileTableScanOperator.class.getSimpleName()); - - Set<String> allSensors = new HashSet<>(commonParameter.measurementColumnNames); - // for time column - allSensors.add(""); - - return new AbstractTableScanOperator.AbstractTableScanOperatorParameter( - allSensors, - operatorContext, - node.getPlanNodeId(), - commonParameter.columnSchemas, - commonParameter.columnsIndexArray, - Collections.emptyList(), - node.getScanOrder(), - seriesScanOptions, - commonParameter.measurementColumnNames, - commonParameter.measurementSchemas, - TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()); - } - - private static Map<Symbol, Integer> buildTagAndAttributeColumnsIndexMap(TableScanNode node) { - Map<Symbol, Integer> tagAndAttributeColumnsIndexMap = new HashMap<>(); - int index = 0; - for (Map.Entry<Symbol, ColumnSchema> entry : node.getAssignments().entrySet()) { - switch (entry.getValue().getColumnCategory()) { - case TAG: - case ATTRIBUTE: - tagAndAttributeColumnsIndexMap.put(entry.getKey(), index++); - break; - default: - break; - } - } - return tagAndAttributeColumnsIndexMap; - } - private SeriesScanOptions.Builder getSeriesScanOptionsBuilder( LocalExecutionPlanContext context, @NotNull Expression timePredicate) { SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index d6cc700a14f..104ed577882 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -736,23 +736,19 @@ public class TableDistributedPlanGenerator new TRegionReplicaSet(null, ImmutableList.of(DataNodeEndPoints.getLocalDataNodeLocation())); node.setRegionReplicaSet(localRegionReplicaSet); context.mostUsedRegion = node.getRegionReplicaSet(); + List<PlanNode> resultNodes = + splitExternalTsFileScanByDeviceEntries(node, localRegionReplicaSet); if (context.hasSortProperty) { - processExternalTsFileSortProperty(node, context); - return Collections.singletonList(node); - } - - List<PlanNode> splitNodes = splitExternalTsFileScanByDeviceEntries(node, localRegionReplicaSet); - if (!splitNodes.isEmpty()) { - return splitNodes; + processSortProperty(node, resultNodes, context); } - return Collections.singletonList(node); + return resultNodes; } private List<PlanNode> splitExternalTsFileScanByDeviceEntries( final ExternalTsFileScanNode node, final TRegionReplicaSet localRegionReplicaSet) { List<DeviceEntry> deviceEntries = node.getDeviceEntries(); if (deviceEntries.size() <= 1) { - return Collections.emptyList(); + return Collections.singletonList(node); } int splitCount = @@ -760,7 +756,7 @@ public class TableDistributedPlanGenerator deviceEntries.size(), IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism()); if (splitCount <= 1) { - return Collections.emptyList(); + return Collections.singletonList(node); } List<List<DeviceEntry>> splitDeviceEntries = new ArrayList<>(splitCount); @@ -791,6 +787,7 @@ public class TableDistributedPlanGenerator node.getPushDownOffset(), node.getTimePredicate().orElse(null), node.getScanOrder(), + node.isPushLimitToEachDevice(), node.getTsFilePaths(), entries, splitDeviceOffsets.get(i)); @@ -1992,141 +1989,9 @@ public class TableDistributedPlanGenerator newOrderingScheme.ifPresent( orderingScheme -> nodeOrderingMap.put(scanNode.getPlanNodeId(), orderingScheme)); if (comparator != null) { - scanNode.getDeviceEntries().sort(comparator); - } - } - } - - private void processExternalTsFileSortProperty( - final ExternalTsFileScanNode externalTsFileScanNode, final PlanContext context) { - final OrderingScheme expectedOrderingScheme = context.expectedOrderingScheme; - final List<Symbol> newOrderingSymbols = new ArrayList<>(); - final List<SortOrder> newSortOrders = new ArrayList<>(); - boolean lastIsTimeRelated = false; - - for (final Symbol symbol : expectedOrderingScheme.getOrderBy()) { - if (externalTsFileScanNode.isTimeColumn(symbol)) { - if (!expectedOrderingScheme.getOrdering(symbol).isAscending()) { - externalTsFileScanNode.setScanOrder(Ordering.DESC); - } - newOrderingSymbols.add(symbol); - newSortOrders.add(expectedOrderingScheme.getOrdering(symbol)); - lastIsTimeRelated = true; - break; - } - - final ColumnSchema columnSchema = externalTsFileScanNode.getAssignments().get(symbol); - if (columnSchema == null || columnSchema.getColumnCategory() != TsTableColumnCategory.TAG) { - break; - } - - newOrderingSymbols.add(symbol); - newSortOrders.add(expectedOrderingScheme.getOrdering(symbol)); - } - - if (newOrderingSymbols.isEmpty()) { - return; - } - - OrderingScheme orderingScheme = - new OrderingScheme( - newOrderingSymbols, - IntStream.range(0, newOrderingSymbols.size()) - .boxed() - .collect(Collectors.toMap(newOrderingSymbols::get, newSortOrders::get))); - - Comparator<DeviceEntry> comparator = - createExternalTsFileDeviceEntryComparator( - externalTsFileScanNode, newOrderingSymbols, newSortOrders); - if (comparator != null) { - Map<IDeviceID, List<ExternalTsFileDeviceOffset>> offsetMap = new HashMap<>(); - List<DeviceEntry> deviceEntries = externalTsFileScanNode.getDeviceEntries(); - List<List<ExternalTsFileDeviceOffset>> deviceOffsets = - externalTsFileScanNode.getDeviceOffsets(); - for (int i = 0; i < deviceEntries.size(); i++) { - offsetMap.put(deviceEntries.get(i).getDeviceID(), deviceOffsets.get(i)); - } - externalTsFileScanNode.getDeviceEntries().sort(comparator); - List<List<ExternalTsFileDeviceOffset>> sortedDeviceOffsets = - new ArrayList<>(externalTsFileScanNode.getDeviceEntries().size()); - for (DeviceEntry deviceEntry : externalTsFileScanNode.getDeviceEntries()) { - sortedDeviceOffsets.add(offsetMap.get(deviceEntry.getDeviceID())); - } - externalTsFileScanNode.setDeviceOffsets(sortedDeviceOffsets); - } - - if (lastIsTimeRelated) { - if (newOrderingSymbols.size() > 1 - && newOrderingSymbols.size() == expectedOrderingScheme.getOrderBy().size() - && isOrderByAllIdsAndTime( - analysis.getTableColumnSchema(externalTsFileScanNode.getQualifiedObjectName()), - externalTsFileScanNode.getAssignments(), - new OrderingScheme( - newOrderingSymbols.subList(0, newOrderingSymbols.size() - 1), - IntStream.range(0, newOrderingSymbols.size() - 1) - .boxed() - .collect(Collectors.toMap(newOrderingSymbols::get, newSortOrders::get))), - newOrderingSymbols.size() - 2)) { - nodeOrderingMap.put(externalTsFileScanNode.getPlanNodeId(), orderingScheme); - } - return; - } - - if (newOrderingSymbols.size() == expectedOrderingScheme.getOrderBy().size()) { - nodeOrderingMap.put(externalTsFileScanNode.getPlanNodeId(), orderingScheme); - } - } - - private Comparator<DeviceEntry> createExternalTsFileDeviceEntryComparator( - ExternalTsFileScanNode externalTsFileScanNode, - List<Symbol> orderingSymbols, - List<SortOrder> sortOrders) { - Comparator<DeviceEntry> comparator = null; - for (int i = 0; i < orderingSymbols.size(); i++) { - Symbol symbol = orderingSymbols.get(i); - if (externalTsFileScanNode.isTimeColumn(symbol)) { - break; - } - int tagIndex = getExternalTsFileTagIndex(externalTsFileScanNode, symbol); - final int deviceSegmentIndex = tagIndex + 1; - SortOrder sortOrder = sortOrders.get(i); - Comparator<String> valueComparator = - sortOrder.isNullsFirst() - ? Comparator.nullsFirst(Comparator.naturalOrder()) - : Comparator.nullsLast(Comparator.naturalOrder()); - Comparator<DeviceEntry> currentComparator = - Comparator.comparing( - deviceEntry -> getDeviceSegment(deviceEntry.getDeviceID(), deviceSegmentIndex), - valueComparator); - if (!sortOrder.isAscending()) { - currentComparator = currentComparator.reversed(); - } - comparator = - comparator == null ? currentComparator : comparator.thenComparing(currentComparator); - } - return comparator == null ? null : comparator.thenComparing(DeviceEntry::getDeviceID); - } - - private int getExternalTsFileTagIndex( - ExternalTsFileScanNode externalTsFileScanNode, Symbol symbol) { - int tagIndex = 0; - for (Map.Entry<Symbol, ColumnSchema> entry : - externalTsFileScanNode.getAssignments().entrySet()) { - if (entry.getValue().getColumnCategory() != TsTableColumnCategory.TAG) { - continue; + scanNode.sortDeviceEntries(comparator); } - if (entry.getKey().equals(symbol)) { - return tagIndex; - } - tagIndex++; } - throw new IllegalArgumentException("Unexpected external TsFile ordering symbol: " + symbol); - } - - private String getDeviceSegment(IDeviceID deviceID, int deviceSegmentIndex) { - return deviceSegmentIndex < deviceID.segmentNum() - ? (String) deviceID.segment(deviceSegmentIndex) - : null; } private Optional<IDeviceID.TreeDeviceIdColumnValueExtractor> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/DeviceTableScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/DeviceTableScanNode.java index c02c74058af..e4fceb9818b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/DeviceTableScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/DeviceTableScanNode.java @@ -41,6 +41,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -275,6 +276,10 @@ public class DeviceTableScanNode extends TableScanNode { this.deviceEntries.add(deviceEntry); } + public void sortDeviceEntries(Comparator<DeviceEntry> comparator) { + this.deviceEntries.sort(comparator); + } + public void setPushLimitToEachDevice(boolean pushLimitToEachDevice) { this.pushLimitToEachDevice = pushLimitToEachDevice; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java index 0b88692575f..f21a7a755c0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java @@ -35,8 +35,10 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.stream.IntStream; import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TAG; @@ -247,6 +249,27 @@ public class ExternalTsFileScanNode extends DeviceTableScanNode { this.deviceOffsets = copyDeviceOffsets(deviceOffsets); } + @Override + public void sortDeviceEntries(Comparator<DeviceEntry> comparator) { + int[] indexes = + IntStream.range(0, deviceEntries.size()) + .boxed() + .sorted( + (left, right) -> + comparator.compare(deviceEntries.get(left), deviceEntries.get(right))) + .mapToInt(Integer::intValue) + .toArray(); + List<DeviceEntry> sortedDeviceEntries = new ArrayList<>(deviceEntries.size()); + List<List<ExternalTsFileDeviceOffset>> sortedDeviceOffsets = + new ArrayList<>(deviceOffsets.size()); + for (int index : indexes) { + sortedDeviceEntries.add(deviceEntries.get(index)); + sortedDeviceOffsets.add(deviceOffsets.get(index)); + } + this.deviceEntries = sortedDeviceEntries; + this.deviceOffsets = sortedDeviceOffsets; + } + private static List<List<ExternalTsFileDeviceOffset>> copyDeviceOffsets( List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { List<List<ExternalTsFileDeviceOffset>> copiedDeviceOffsets =
