This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch read_tsfile_table_function in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0bc87465afb77e38d5e6c2f18d1491b84151272d Author: shuwenwei <[email protected]> AuthorDate: Thu Jun 4 19:02:52 2026 +0800 extends device table scan node --- .../plan/planner/plan/node/PlanGraphPrinter.java | 8 +- .../iterative/rule/PruneTableScanColumns.java | 43 ++++++----- .../planner/node/ExternalTsFileScanNode.java | 88 +++++++++++++--------- .../PushAggregationIntoTableScan.java | 2 + .../optimizations/PushPredicateIntoTableScan.java | 14 ++-- .../optimizations/UnaliasSymbolReferences.java | 7 +- 6 files changed, 94 insertions(+), 68 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index 3ab8b283d0b..b6399820f8b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -682,12 +682,8 @@ public class PlanGraphPrinter implements PlanVisitor<List<String>, PlanGraphPrin } } if (externalTsFileScanNode != null) { - boxValue.add(String.format("ScanOrder: %s", externalTsFileScanNode.getScanOrder())); - boxValue.add(String.format("TsFilePaths: %s", externalTsFileScanNode.getTsFilePaths())); - externalTsFileScanNode - .getTimePredicate() - .ifPresent( - timePredicate -> boxValue.add(String.format("TimePredicate: %s", timePredicate))); + boxValue.add( + String.format("TsFileNumber: %s", externalTsFileScanNode.getTsFilePaths().size())); } if (node.getPushDownPredicate() != null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java index 42c1c8025a7..a8a36c6328e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java @@ -83,7 +83,32 @@ public class PruneTableScanColumns extends ProjectOffPushDownRule<TableScanNode> .forEach(symbol -> newAssignments.put(symbol, node.getAssignments().get(symbol))); } - if (node instanceof DeviceTableScanNode) { + if (node instanceof ExternalTsFileScanNode) { + ExternalTsFileScanNode externalTsFileScanNode = (ExternalTsFileScanNode) node; + externalTsFileScanNode + .getTimePredicate() + .ifPresent( + timePredicate -> + SymbolsExtractor.extractUnique(timePredicate) + .forEach( + symbol -> newAssignments.put(symbol, node.getAssignments().get(symbol)))); + ExternalTsFileScanNode prunedNode = + new ExternalTsFileScanNode( + externalTsFileScanNode.getPlanNodeId(), + externalTsFileScanNode.getQualifiedObjectName(), + newOutputs, + newAssignments, + externalTsFileScanNode.getPushDownPredicate(), + externalTsFileScanNode.getPushDownLimit(), + externalTsFileScanNode.getPushDownOffset(), + externalTsFileScanNode.getTimePredicate().orElse(null), + externalTsFileScanNode.getScanOrder(), + externalTsFileScanNode.getTsFilePaths(), + externalTsFileScanNode.getDeviceEntries(), + externalTsFileScanNode.getDeviceOffsets()); + prunedNode.setRegionReplicaSet(externalTsFileScanNode.getRegionReplicaSet()); + return Optional.of(prunedNode); + } else if (node instanceof DeviceTableScanNode) { DeviceTableScanNode deviceTableScanNode = (DeviceTableScanNode) node; // add time entry if TimePredicate exists deviceTableScanNode @@ -177,22 +202,6 @@ public class PruneTableScanColumns extends ProjectOffPushDownRule<TableScanNode> prunedNode.setRegionReplicaSet(deviceTableScanNode.getRegionReplicaSet()); return Optional.of(prunedNode); } - } else if (node instanceof ExternalTsFileScanNode) { - ExternalTsFileScanNode externalTsFileScanNode = (ExternalTsFileScanNode) node; - return Optional.of( - new ExternalTsFileScanNode( - externalTsFileScanNode.getPlanNodeId(), - externalTsFileScanNode.getQualifiedObjectName(), - newOutputs, - newAssignments, - externalTsFileScanNode.getPushDownPredicate(), - externalTsFileScanNode.getPushDownLimit(), - externalTsFileScanNode.getPushDownOffset(), - externalTsFileScanNode.getTimePredicate().orElse(null), - externalTsFileScanNode.getScanOrder(), - externalTsFileScanNode.getTsFilePaths(), - externalTsFileScanNode.getDeviceEntries(), - externalTsFileScanNode.getDeviceOffsets())); } else if (node instanceof InformationSchemaTableScanNode) { // For the convenience of process in execution stage, column-prune for // InformationSchemaTableScanNode is diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java index 71184f21eed..0b88692575f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.node; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.IPlanVisitor; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; -import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.TableScanNode; import org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol; @@ -38,13 +37,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; -public class ExternalTsFileScanNode extends TableScanNode { +import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TAG; + +public class ExternalTsFileScanNode extends DeviceTableScanNode { private List<String> tsFilePaths; - private Expression timePredicate; - private Ordering scanOrder = Ordering.ASC; - private List<DeviceEntry> deviceEntries = Collections.emptyList(); private List<List<ExternalTsFileDeviceOffset>> deviceOffsets = Collections.emptyList(); protected ExternalTsFileScanNode() {} @@ -73,7 +70,7 @@ public class ExternalTsFileScanNode extends TableScanNode { List<String> tsFilePaths, List<DeviceEntry> deviceEntries, List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { - super(id, qualifiedObjectName, outputSymbols, assignments); + super(id, qualifiedObjectName, outputSymbols, assignments, buildTagIndexMap(assignments)); this.tsFilePaths = Collections.unmodifiableList(new ArrayList<>(tsFilePaths)); this.deviceEntries = new ArrayList<>(deviceEntries); this.deviceOffsets = copyDeviceOffsets(deviceOffsets); @@ -167,18 +164,51 @@ public class ExternalTsFileScanNode extends TableScanNode { List<String> tsFilePaths, List<DeviceEntry> deviceEntries, List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { + this( + id, + qualifiedObjectName, + outputSymbols, + assignments, + pushDownPredicate, + pushDownLimit, + pushDownOffset, + timePredicate, + scanOrder, + false, + tsFilePaths, + deviceEntries, + deviceOffsets); + } + + public ExternalTsFileScanNode( + PlanNodeId id, + QualifiedObjectName qualifiedObjectName, + List<Symbol> outputSymbols, + Map<Symbol, ColumnSchema> assignments, + Expression pushDownPredicate, + long pushDownLimit, + long pushDownOffset, + Expression timePredicate, + Ordering scanOrder, + boolean pushLimitToEachDevice, + List<String> tsFilePaths, + List<DeviceEntry> deviceEntries, + List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { super( id, qualifiedObjectName, outputSymbols, assignments, + new ArrayList<>(deviceEntries), + buildTagIndexMap(assignments), + scanOrder, + timePredicate, pushDownPredicate, pushDownLimit, - pushDownOffset); - this.timePredicate = timePredicate; - this.scanOrder = scanOrder; + pushDownOffset, + pushLimitToEachDevice, + false); this.tsFilePaths = Collections.unmodifiableList(new ArrayList<>(tsFilePaths)); - this.deviceEntries = new ArrayList<>(deviceEntries); this.deviceOffsets = copyDeviceOffsets(deviceOffsets); } @@ -199,6 +229,7 @@ public class ExternalTsFileScanNode extends TableScanNode { pushDownOffset, timePredicate, scanOrder, + pushLimitToEachDevice, tsFilePaths, deviceEntries, deviceOffsets); @@ -208,14 +239,6 @@ public class ExternalTsFileScanNode extends TableScanNode { return tsFilePaths; } - public List<DeviceEntry> getDeviceEntries() { - return deviceEntries; - } - - public void setDeviceEntries(List<DeviceEntry> deviceEntries) { - this.deviceEntries = new ArrayList<>(deviceEntries); - } - public List<List<ExternalTsFileDeviceOffset>> getDeviceOffsets() { return deviceOffsets; } @@ -234,22 +257,6 @@ public class ExternalTsFileScanNode extends TableScanNode { return copiedDeviceOffsets; } - public Optional<Expression> getTimePredicate() { - return Optional.ofNullable(timePredicate); - } - - public void setTimePredicate(Expression timePredicate) { - this.timePredicate = timePredicate; - } - - public Ordering getScanOrder() { - return scanOrder; - } - - public void setScanOrder(Ordering scanOrder) { - this.scanOrder = scanOrder; - } - @Override protected void serializeAttributes(ByteBuffer byteBuffer) { throw new UnsupportedOperationException( @@ -271,4 +278,15 @@ public class ExternalTsFileScanNode extends TableScanNode { public String toString() { return "ExternalTsFileScanNode-" + this.getPlanNodeId(); } + + private static Map<Symbol, Integer> buildTagIndexMap(Map<Symbol, ColumnSchema> assignments) { + Map<Symbol, Integer> tagIndexMap = new java.util.HashMap<>(); + int tagIndex = 0; + for (Map.Entry<Symbol, ColumnSchema> entry : assignments.entrySet()) { + if (TAG.equals(entry.getValue().getColumnCategory())) { + tagIndexMap.put(entry.getKey(), tagIndex++); + } + } + return tagIndexMap; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java index 30ed96dd7a9..b5072bfd5c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileScanNode; import com.google.common.collect.ImmutableSet; import org.apache.tsfile.utils.Pair; @@ -104,6 +105,7 @@ public class PushAggregationIntoTableScan implements PlanOptimizer { // only optimize AggregationNode with raw DeviceTableScanNode if (tableScanNode == null + || tableScanNode instanceof ExternalTsFileScanNode || tableScanNode instanceof AggregationTableScanNode) { // no need to optimize return node; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java index 05df24c0cd6..af6ddc5db1c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java @@ -517,10 +517,10 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { Pair<Expression, Boolean> resultPair = extractGlobalTimeFilter(pushDownPredicate, splitExpression.getTimeColumnName()); Boolean hasValueFilter = resultPair.getRight(); - if (tableScanNode instanceof DeviceTableScanNode && resultPair.left != null) { - ((DeviceTableScanNode) tableScanNode).setTimePredicate(resultPair.left); - } else if (tableScanNode instanceof ExternalTsFileScanNode && resultPair.left != null) { + if (tableScanNode instanceof ExternalTsFileScanNode && resultPair.left != null) { ((ExternalTsFileScanNode) tableScanNode).setTimePredicate(resultPair.left); + } else if (tableScanNode instanceof DeviceTableScanNode && resultPair.left != null) { + ((DeviceTableScanNode) tableScanNode).setTimePredicate(resultPair.left); } if (Boolean.TRUE.equals(hasValueFilter)) { if (pushDownPredicate instanceof LogicalExpression @@ -536,12 +536,12 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { } // do index scan after expressionCanPushDown is processed - if (tableScanNode instanceof DeviceTableScanNode) { - getDeviceEntriesWithDataPartitions( - (DeviceTableScanNode) tableScanNode, splitExpression.getMetadataExpressions()); - } else if (tableScanNode instanceof ExternalTsFileScanNode) { + if (tableScanNode instanceof ExternalTsFileScanNode) { collectExternalTsFileDeviceTasks( (ExternalTsFileScanNode) tableScanNode, splitExpression.getMetadataExpressions()); + } else if (tableScanNode instanceof DeviceTableScanNode) { + getDeviceEntriesWithDataPartitions( + (DeviceTableScanNode) tableScanNode, splitExpression.getMetadataExpressions()); } // exist expressions can not push down to scan operator diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index c2f98dd0119..b72b05fbff5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@ -287,7 +287,7 @@ public class UnaliasSymbolReferences implements PlanOptimizer { newAssignments.put(newSymbol, handle); }); - return new PlanAndMappings( + ExternalTsFileScanNode rewrittenNode = new ExternalTsFileScanNode( node.getPlanNodeId(), node.getQualifiedObjectName(), @@ -300,8 +300,9 @@ public class UnaliasSymbolReferences implements PlanOptimizer { node.getScanOrder(), node.getTsFilePaths(), node.getDeviceEntries(), - node.getDeviceOffsets()), - mapping); + node.getDeviceOffsets()); + rewrittenNode.setRegionReplicaSet(node.getRegionReplicaSet()); + return new PlanAndMappings(rewrittenNode, mapping); } @Override
