This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch readTsFileTableFunction in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3315146bca8574720bf6c84aed54056265762c21 Author: shuwenwei <[email protected]> AuthorDate: Thu May 28 18:46:55 2026 +0800 plan --- .../planner/DataNodeTableOperatorGenerator.java | 8 + .../plan/node/DataNodePlanNodeDeserializer.java | 3 + .../plan/planner/plan/node/PlanVisitor.java | 5 + .../plan/relational/planner/RelationPlanner.java | 65 +++ .../distribute/TableDistributedPlanGenerator.java | 11 + .../planner/node/ExternalTsFileScanNode.java | 146 +++++++ .../optimizations/UnaliasSymbolReferences.java | 30 ++ .../plan/planner/plan/node/PlanNodeType.java | 1 + .../function/TableBuiltinTableFunction.java | 6 +- .../relational/tvf/ReadTsFileTableFunction.java | 478 +++++++++++++++++++++ 10 files changed, 752 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java index b5de9a5561e..81bbfa60d13 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java @@ -122,6 +122,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode; @@ -1116,6 +1117,13 @@ public class DataNodeTableOperatorGenerator return tableScanOperator; } + @Override + public Operator visitExternalTsFileScan( + ExternalTsFileScanNode node, LocalExecutionPlanContext context) { + throw new UnsupportedOperationException( + "ExternalTsFileScanNode physical operator is not implemented yet"); + } + private SeriesScanOptions.Builder getSeriesScanOptionsBuilder( LocalExecutionPlanContext context, @NotNull Expression timePredicate) { SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java index 2a9b9b6905c..b79011de3ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java @@ -123,6 +123,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalIn import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableDiskUsageInformationSchemaTableScanNode; @@ -469,6 +470,8 @@ public class DataNodePlanNodeDeserializer extends CommonPlanNodeDeserializer { return AlignedAggregationTreeDeviceViewScanNode.deserialize(buffer); case 1042: return NonAlignedAggregationTreeDeviceViewScanNode.deserialize(buffer); + case 1043: + return ExternalTsFileScanNode.deserialize(buffer); case 2000: return RelationalInsertTabletNode.deserialize(buffer); case 2001: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 54ee9d17a94..73c41ab1079 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -124,6 +124,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationT import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeAlignedDeviceViewScanNode; @@ -649,6 +650,10 @@ public interface PlanVisitor<R, C> extends ICoreQueryPlanVisitor<R, C> { return visitTableScan(node, context); } + default R visitExternalTsFileScan(ExternalTsFileScanNode node, C context) { + return visitTableScan(node, context); + } + default R visitInformationSchemaTableScan(InformationSchemaTableScanNode node, C context) { return visitTableScan(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index 268cd43452d..18e2c2d8cb7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.queryengine.common.SessionInfo; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.TableScanNode; import org.apache.iotdb.commons.queryengine.plan.relational.analyzer.NodeRef; +import org.apache.iotdb.commons.queryengine.plan.relational.function.TableBuiltinTableFunction; import org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.commons.queryengine.plan.relational.planner.Assignments; @@ -88,6 +89,7 @@ import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.VariableDefi import org.apache.iotdb.commons.queryengine.plan.relational.type.InternalTypeManager; import org.apache.iotdb.commons.queryengine.utils.cte.CteDataStore; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; +import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction; import org.apache.iotdb.db.i18n.DataNodeQueryMessages; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; @@ -120,6 +122,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils; import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.PredicateWithUncorrelatedScalarSubqueryReconstructor; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.rowpattern.rowpattern.RowPatternToIrRewriter; @@ -150,6 +153,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -1455,6 +1459,11 @@ public class RelationPlanner implements AstVisitor<RelationPlan, Void> { @Override public RelationPlan visitTableFunctionInvocation(TableFunctionInvocation node, Void context) { TableFunctionInvocationAnalysis functionAnalysis = analysis.getTableFunctionAnalysis(node); + if (TableBuiltinTableFunction.READ_TSFILE + .getFunctionName() + .equalsIgnoreCase(functionAnalysis.getFunctionName())) { + return planExternalTsFileScan(node, functionAnalysis); + } ImmutableList.Builder<PlanNode> sources = ImmutableList.builder(); ImmutableList.Builder<TableFunctionNode.TableArgumentProperties> sourceProperties = @@ -1583,6 +1592,62 @@ public class RelationPlanner implements AstVisitor<RelationPlan, Void> { return new RelationPlan(root, analysis.getScope(node), outputSymbols.build(), outerContext); } + private RelationPlan planExternalTsFileScan( + TableFunctionInvocation node, TableFunctionInvocationAnalysis functionAnalysis) { + if (!(functionAnalysis.getTableFunctionHandle() + instanceof ReadTsFileTableFunction.ReadTsFileTableFunctionHandle)) { + throw new IllegalStateException("read_tsfile table function handle is invalid"); + } + + ReadTsFileTableFunction.ReadTsFileTableFunctionHandle handle = + (ReadTsFileTableFunction.ReadTsFileTableFunctionHandle) + functionAnalysis.getTableFunctionHandle(); + Scope scope = analysis.getScope(node); + RelationType relationType = scope.getRelationType(); + + ImmutableList.Builder<Symbol> outputSymbolsBuilder = ImmutableList.builder(); + ImmutableMap.Builder<Symbol, ColumnSchema> assignmentsBuilder = ImmutableMap.builder(); + for (int i = 0; i < relationType.getAllFieldCount(); i++) { + Field field = relationType.getFieldByIndex(i); + Symbol symbol = symbolAllocator.newSymbol(field); + outputSymbolsBuilder.add(symbol); + assignmentsBuilder.put( + symbol, + new ColumnSchema( + field.getName().orElse(null), + field.getType(), + field.isHidden(), + field.getColumnCategory())); + } + + List<Symbol> outputSymbols = outputSymbolsBuilder.build(); + Map<Symbol, ColumnSchema> assignments = assignmentsBuilder.build(); + QualifiedObjectName qualifiedObjectName = + createExternalTsFileQualifiedObjectName(handle.getTableName()); + analysis.addTableSchema(qualifiedObjectName, assignments); + + return new RelationPlan( + new ExternalTsFileScanNode( + idAllocator.genPlanNodeId(), + qualifiedObjectName, + outputSymbols, + assignments, + handle.getTsFilePaths()), + scope, + outputSymbols, + outerContext); + } + + private QualifiedObjectName createExternalTsFileQualifiedObjectName(String tableName) { + String normalizedTableName = tableName.toLowerCase(Locale.ENGLISH); + if (normalizedTableName.indexOf('.') >= 0) { + return QualifiedObjectName.valueOf(normalizedTableName); + } + String databaseName = + sessionInfo.getDatabaseName().orElse("external").toLowerCase(Locale.ENGLISH); + return new QualifiedObjectName(databaseName, normalizedTableName); + } + private static void stayConsistent( String[] measurements, MeasurementSchema[] measurementSchemas) { int minLength = Math.min(measurements.length, measurementSchemas.length); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index d31996aceba..709db77a38e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -96,6 +96,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggre import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CopyToNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode; @@ -727,6 +728,16 @@ public class TableDistributedPlanGenerator } } + @Override + public List<PlanNode> visitExternalTsFileScan( + final ExternalTsFileScanNode node, final PlanContext context) { + node.setRegionReplicaSet( + new TRegionReplicaSet( + null, ImmutableList.of(DataNodeEndPoints.getLocalDataNodeLocation()))); + context.mostUsedRegion = node.getRegionReplicaSet(); + return Collections.singletonList(node); + } + private List<PlanNode> constructDeviceTableScanByTags( final DeviceTableScanNode node, final PlanContext context) { DataPartition dataPartition = analysis.getDataPartitionInfo(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java new file mode 100644 index 00000000000..7e2d4850b56 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.IPlanVisitor; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.TableScanNode; +import org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; + +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class ExternalTsFileScanNode extends TableScanNode { + private List<String> tsFilePaths; + + protected ExternalTsFileScanNode() {} + + public ExternalTsFileScanNode( + PlanNodeId id, + QualifiedObjectName qualifiedObjectName, + List<Symbol> outputSymbols, + Map<Symbol, ColumnSchema> assignments, + List<String> tsFilePaths) { + super(id, qualifiedObjectName, outputSymbols, assignments); + this.tsFilePaths = Collections.unmodifiableList(new ArrayList<>(tsFilePaths)); + } + + public ExternalTsFileScanNode( + PlanNodeId id, + QualifiedObjectName qualifiedObjectName, + List<Symbol> outputSymbols, + Map<Symbol, ColumnSchema> assignments, + Expression pushDownPredicate, + long pushDownLimit, + long pushDownOffset, + List<String> tsFilePaths) { + super( + id, + qualifiedObjectName, + outputSymbols, + assignments, + pushDownPredicate, + pushDownLimit, + pushDownOffset); + this.tsFilePaths = Collections.unmodifiableList(new ArrayList<>(tsFilePaths)); + } + + @Override + public <R, C> R accept(IPlanVisitor<R, C> visitor, C context) { + return ((PlanVisitor<R, C>) visitor).visitExternalTsFileScan(this, context); + } + + @Override + public ExternalTsFileScanNode clone() { + return new ExternalTsFileScanNode( + getPlanNodeId(), + qualifiedObjectName, + outputSymbols, + assignments, + pushDownPredicate, + pushDownLimit, + pushDownOffset, + tsFilePaths); + } + + public List<String> getTsFilePaths() { + return tsFilePaths; + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.EXTERNAL_TSFILE_SCAN_NODE.serialize(byteBuffer); + TableScanNode.serializeMemberVariables(this, byteBuffer, true); + serializeTsFilePaths(byteBuffer); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.EXTERNAL_TSFILE_SCAN_NODE.serialize(stream); + TableScanNode.serializeMemberVariables(this, stream, true); + serializeTsFilePaths(stream); + } + + private void serializeTsFilePaths(ByteBuffer byteBuffer) { + ReadWriteIOUtils.write(tsFilePaths.size(), byteBuffer); + for (String tsFilePath : tsFilePaths) { + ReadWriteIOUtils.write(tsFilePath, byteBuffer); + } + } + + private void serializeTsFilePaths(DataOutputStream stream) throws IOException { + ReadWriteIOUtils.write(tsFilePaths.size(), stream); + for (String tsFilePath : tsFilePaths) { + ReadWriteIOUtils.write(tsFilePath, stream); + } + } + + public static ExternalTsFileScanNode deserialize(ByteBuffer byteBuffer) { + ExternalTsFileScanNode node = new ExternalTsFileScanNode(); + TableScanNode.deserializeMemberVariables(byteBuffer, node, true); + + int size = ReadWriteIOUtils.readInt(byteBuffer); + List<String> tsFilePaths = new ArrayList<>(size); + while (size-- > 0) { + tsFilePaths.add(ReadWriteIOUtils.readString(byteBuffer)); + } + node.tsFilePaths = Collections.unmodifiableList(tsFilePaths); + + node.setPlanNodeId(PlanNodeId.deserialize(byteBuffer)); + return node; + } + + @Override + public String toString() { + return "ExternalTsFileScanNode-" + this.getPlanNodeId(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index 6dda2d17503..62f88860aed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@ -66,6 +66,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CopyToNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode; @@ -270,6 +271,35 @@ public class UnaliasSymbolReferences implements PlanOptimizer { mapping); } + @Override + public PlanAndMappings visitExternalTsFileScan( + ExternalTsFileScanNode node, UnaliasContext context) { + Map<Symbol, Symbol> mapping = new HashMap<>(context.getCorrelationMapping()); + SymbolMapper mapper = symbolMapper(mapping); + + List<Symbol> newOutputs = mapper.map(node.getOutputSymbols()); + + Map<Symbol, ColumnSchema> newAssignments = new HashMap<>(); + node.getAssignments() + .forEach( + (symbol, handle) -> { + Symbol newSymbol = mapper.map(symbol); + newAssignments.put(newSymbol, handle); + }); + + return new PlanAndMappings( + new ExternalTsFileScanNode( + node.getPlanNodeId(), + node.getQualifiedObjectName(), + newOutputs, + newAssignments, + node.getPushDownPredicate() == null ? null : mapper.map(node.getPushDownPredicate()), + node.getPushDownLimit(), + node.getPushDownOffset(), + node.getTsFilePaths()), + mapping); + } + @Override public PlanAndMappings visitCteScan(CteScanNode node, UnaliasContext context) { Map<Symbol, Symbol> mapping = new HashMap<>(context.getCorrelationMapping()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/PlanNodeType.java index 71eb2238f15..4fb68ac1959 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -200,6 +200,7 @@ public enum PlanNodeType { TABLE_DISK_USAGE_INFORMATION_SCHEMA_TABLE_SCAN_NODE((short) 1040), ALIGNED_AGGREGATION_TREE_DEVICE_VIEW_SCAN_NODE((short) 1041), NON_ALIGNED_AGGREGATION_TREE_DEVICE_VIEW_SCAN_NODE((short) 1042), + EXTERNAL_TSFILE_SCAN_NODE((short) 1043), RELATIONAL_INSERT_TABLET((short) 2000), RELATIONAL_INSERT_ROW((short) 2001), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/function/TableBuiltinTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/function/TableBuiltinTableFunction.java index 43673a2e90c..fe54d1078cd 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/function/TableBuiltinTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/function/TableBuiltinTableFunction.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.queryengine.plan.relational.function.tvf.Pattern import org.apache.iotdb.commons.udf.builtin.relational.tvf.CapacityTableFunction; import org.apache.iotdb.commons.udf.builtin.relational.tvf.CumulateTableFunction; import org.apache.iotdb.commons.udf.builtin.relational.tvf.HOPTableFunction; +import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction; import org.apache.iotdb.commons.udf.builtin.relational.tvf.SessionTableFunction; import org.apache.iotdb.commons.udf.builtin.relational.tvf.TumbleTableFunction; import org.apache.iotdb.commons.udf.builtin.relational.tvf.VariationTableFunction; @@ -45,7 +46,8 @@ public enum TableBuiltinTableFunction { CAPACITY("capacity"), FORECAST("forecast"), PATTERN_MATCH("pattern_match"), - CLASSIFY("classify"); + CLASSIFY("classify"), + READ_TSFILE("read_tsfile"); private final String functionName; @@ -91,6 +93,8 @@ public enum TableBuiltinTableFunction { return new ForecastTableFunction(); case "classify": return new ClassifyTableFunction(); + case "read_tsfile": + return new ReadTsFileTableFunction(); default: throw new UnsupportedOperationException( String.format(QueryMessages.UNSUPPORTED_TABLE_FUNCTION, functionName)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/ReadTsFileTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/ReadTsFileTableFunction.java new file mode 100644 index 00000000000..91641c75a46 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/ReadTsFileTableFunction.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.udf.builtin.relational.tvf; + +import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; +import org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException; +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.TableFunction; +import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; +import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; +import org.apache.iotdb.udf.api.relational.table.argument.Argument; +import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; +import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; +import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionLeafProcessor; +import org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification; +import org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification; +import org.apache.iotdb.udf.api.type.Type; + +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.iotdb.commons.schema.table.TsTable.TIME_COLUMN_NAME; + +/** Reads one or more TsFiles as a table function source. */ +public class ReadTsFileTableFunction implements TableFunction { + private static final String TABLE_NAME_PARAMETER_NAME = "TABLE_NAME"; + private static final String TSFILE_PATHS_PARAMETER_NAME = "TSFILE_PATHS"; + + @Override + public List<ParameterSpecification> getArgumentsSpecifications() { + return Arrays.asList( + ScalarParameterSpecification.builder() + .name(TABLE_NAME_PARAMETER_NAME) + .type(Type.STRING) + .build(), + ScalarParameterSpecification.builder() + .name(TSFILE_PATHS_PARAMETER_NAME) + .type(Type.STRING) + .build()); + } + + @Override + public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws UDFException { + String tableName = getRequiredStringArgument(arguments, TABLE_NAME_PARAMETER_NAME); + List<String> tsFilePaths = + parseTsFilePaths(getRequiredStringArgument(arguments, TSFILE_PATHS_PARAMETER_NAME)); + TsFileSchemaCollection schemaCollection = + collectTsFilesAndResolveSchema(tableName, tsFilePaths); + if (schemaCollection.mergedTableSchema == null) { + throw new UDFArgumentNotValidException( + "No table schema found for table " + tableName + " in TsFiles"); + } + DescribedSchema outputSchema = convertToDescribedSchema(schemaCollection.mergedTableSchema); + + ReadTsFileTableFunctionHandle handle = + new ReadTsFileTableFunctionHandle( + tableName, + schemaCollection.tsFiles.stream() + .map(File::getAbsolutePath) + .collect(Collectors.toList()), + outputSchema); + + return TableFunctionAnalysis.builder().properColumnSchema(outputSchema).handle(handle).build(); + } + + @Override + public TableFunctionHandle createTableFunctionHandle() { + return new ReadTsFileTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { + ReadTsFileTableFunctionHandle handle = (ReadTsFileTableFunctionHandle) tableFunctionHandle; + return new TableFunctionProcessorProvider() { + @Override + public TableFunctionLeafProcessor getSplitProcessor() { + return new ReadTsFileLeafProcessor(handle); + } + }; + } + + private static String getRequiredStringArgument(Map<String, Argument> arguments, String name) { + Argument argument = arguments.get(name); + if (!(argument instanceof ScalarArgument)) { + throw new UDFArgumentNotValidException("Missing scalar argument: " + name); + } + Object value = ((ScalarArgument) argument).getValue(); + if (!(value instanceof String) || ((String) value).trim().isEmpty()) { + throw new UDFArgumentNotValidException("Argument " + name + " should not be empty"); + } + return ((String) value).trim(); + } + + private static List<String> parseTsFilePaths(String tsFilePaths) { + List<String> paths = + Arrays.stream(tsFilePaths.split(",")) + .map(String::trim) + .filter(path -> !path.isEmpty()) + .collect(Collectors.toList()); + if (paths.isEmpty()) { + throw new UDFArgumentNotValidException( + "Argument " + TSFILE_PATHS_PARAMETER_NAME + " should contain at least one path"); + } + return paths; + } + + private static TsFileSchemaCollection collectTsFilesAndResolveSchema( + String tableName, List<String> tsFilePaths) { + List<File> tsFiles = new ArrayList<>(); + MergedTableSchemaBuilder schemaBuilder = null; + for (String tsFilePath : tsFilePaths) { + Path path = new File(tsFilePath).toPath(); + if (!Files.exists(path)) { + throw new UDFArgumentNotValidException("TsFile path does not exist: " + tsFilePath); + } + try (Stream<Path> walkedPaths = Files.walk(path)) { + Iterator<Path> iterator = walkedPaths.filter(Files::isRegularFile).iterator(); + while (iterator.hasNext()) { + Path filePath = iterator.next(); + TableSchema tableSchema = tryReadTableSchema(tableName, filePath.toFile()); + if (tableSchema == null) { + continue; + } + tsFiles.add(filePath.toFile()); + if (schemaBuilder == null) { + schemaBuilder = new MergedTableSchemaBuilder(tableName, tableSchema); + } else { + schemaBuilder.merge(tableSchema); + } + } + } catch (IOException e) { + throw new UDFArgumentNotValidException("Failed to scan TsFile path: " + tsFilePath); + } + } + if (tsFiles.isEmpty()) { + throw new UDFArgumentNotValidException("No valid TsFiles found"); + } + return new TsFileSchemaCollection( + tsFiles, schemaBuilder == null ? null : schemaBuilder.build()); + } + + private static TableSchema tryReadTableSchema(String tableName, File tsFile) { + if (!tsFile.canRead()) { + return null; + } + try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { + if (!reader.isComplete() || reader.readVersionNumber() != TSFileConfig.VERSION_NUMBER) { + return null; + } + Map<String, TableSchema> tableSchemaMap = reader.getTableSchemaMap(); + return tableSchemaMap.get(tableName.toLowerCase(Locale.ENGLISH)); + } catch (Exception e) { + return null; + } + } + + private static class TsFileSchemaCollection { + private final List<File> tsFiles; + private final TableSchema mergedTableSchema; + + private TsFileSchemaCollection(List<File> tsFiles, TableSchema mergedTableSchema) { + this.tsFiles = tsFiles; + this.mergedTableSchema = mergedTableSchema; + } + } + + private static class MergedTableSchemaBuilder { + private final String tableName; + private IMeasurementSchema timeColumnSchema; + private final List<IMeasurementSchema> tagColumnSchemas = new ArrayList<>(); + private final Map<String, IMeasurementSchema> fieldColumnSchemaMap = new LinkedHashMap<>(); + + private MergedTableSchemaBuilder(String tableName, TableSchema tableSchema) { + this.tableName = tableName.toLowerCase(Locale.ENGLISH); + merge(tableSchema); + } + + private void merge(TableSchema tableSchema) { + IMeasurementSchema currentTimeColumn = null; + List<IMeasurementSchema> currentTagColumns = new ArrayList<>(); + List<IMeasurementSchema> currentFieldColumns = new ArrayList<>(); + List<IMeasurementSchema> columnSchemas = tableSchema.getColumnSchemas(); + List<ColumnCategory> columnCategories = tableSchema.getColumnTypes(); + + for (int i = 0; i < columnCategories.size(); i++) { + if (columnCategories.get(i) == ColumnCategory.TIME) { + if (currentTimeColumn != null) { + throw new UDFArgumentNotValidException( + "Multiple time columns found when merging table schema for table " + tableName); + } + currentTimeColumn = columnSchemas.get(i); + } else if (columnCategories.get(i) == ColumnCategory.TAG) { + currentTagColumns.add(columnSchemas.get(i)); + } else if (columnCategories.get(i) == ColumnCategory.FIELD) { + currentFieldColumns.add(columnSchemas.get(i)); + } + } + + mergeTimeColumn(currentTimeColumn); + mergeTagColumns(currentTagColumns); + mergeFieldColumns(currentFieldColumns); + } + + private void mergeTimeColumn(IMeasurementSchema currentTimeColumn) { + if (currentTimeColumn == null) { + return; + } + if (timeColumnSchema == null) { + timeColumnSchema = currentTimeColumn; + return; + } + if (!timeColumnSchema.getMeasurementName().equals(currentTimeColumn.getMeasurementName()) + || currentTimeColumn.getType() != TSDataType.TIMESTAMP) { + throw new UDFArgumentNotValidException( + "Time column conflicts when merging table schema for table " + tableName); + } + } + + private void mergeTagColumns(List<IMeasurementSchema> currentTagColumns) { + int prefixLength = Math.min(tagColumnSchemas.size(), currentTagColumns.size()); + for (int i = 0; i < prefixLength; i++) { + if (!tagColumnSchemas + .get(i) + .getMeasurementName() + .equals(currentTagColumns.get(i).getMeasurementName())) { + throw new UDFArgumentNotValidException( + "Tag columns conflict when merging table schema for table " + tableName); + } + } + tagColumnSchemas.addAll(currentTagColumns.subList(prefixLength, currentTagColumns.size())); + } + + private void mergeFieldColumns(List<IMeasurementSchema> currentFieldColumns) { + for (IMeasurementSchema fieldColumn : currentFieldColumns) { + String fieldName = fieldColumn.getMeasurementName().toLowerCase(Locale.ENGLISH); + IMeasurementSchema existingColumn = fieldColumnSchemaMap.get(fieldName); + if (existingColumn != null && existingColumn.getType() != fieldColumn.getType()) { + throw new UDFArgumentNotValidException( + "Field column " + + fieldColumn.getMeasurementName() + + " has conflicting data types when merging table schema for table " + + tableName); + } + fieldColumnSchemaMap.putIfAbsent(fieldName, fieldColumn); + } + } + + private TableSchema build() { + List<IMeasurementSchema> columnSchemas = new ArrayList<>(); + List<ColumnCategory> columnCategories = new ArrayList<>(); + columnSchemas.add( + timeColumnSchema == null + ? new MeasurementSchema(TIME_COLUMN_NAME, TSDataType.TIMESTAMP) + : timeColumnSchema); + columnCategories.add(ColumnCategory.TIME); + for (IMeasurementSchema tagColumnSchema : tagColumnSchemas) { + columnSchemas.add(tagColumnSchema); + columnCategories.add(ColumnCategory.TAG); + } + for (IMeasurementSchema fieldColumnSchema : fieldColumnSchemaMap.values()) { + columnSchemas.add(fieldColumnSchema); + columnCategories.add(ColumnCategory.FIELD); + } + return new TableSchema(tableName, columnSchemas, columnCategories); + } + } + + private static DescribedSchema convertToDescribedSchema(TableSchema tableSchema) { + DescribedSchema.Builder builder = DescribedSchema.builder(); + for (IMeasurementSchema columnSchema : tableSchema.getColumnSchemas()) { + builder.addField( + columnSchema.getMeasurementName(), + UDFDataTypeTransformer.transformToUDFDataType(columnSchema.getType())); + } + return builder.build(); + } + + private static class ReadTsFileLeafProcessor implements TableFunctionLeafProcessor { + private final ReadTsFileTableFunctionHandle handle; + private boolean finished; + + private ReadTsFileLeafProcessor(ReadTsFileTableFunctionHandle handle) { + this.handle = handle; + } + + @Override + public void beforeStart() { + // TODO: Open TsFile resources here. + finished = true; + } + + @Override + public void process(List<ColumnBuilder> columnBuilders) { + // TODO: Read one batch from TsFile resources and write values into column builders. + } + + @Override + public boolean isFinish() { + return finished; + } + + @Override + public void beforeDestroy() { + // TODO: Close TsFile resources here. + } + } + + public static class ReadTsFileTableFunctionHandle implements TableFunctionHandle { + private String tableName; + private List<String> tsFilePaths; + private List<String> outputColumnNames; + private List<Type> outputColumnTypes; + + public ReadTsFileTableFunctionHandle() { + this("", Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + } + + public ReadTsFileTableFunctionHandle( + String tableName, List<String> tsFilePaths, DescribedSchema outputSchema) { + this( + tableName, + tsFilePaths, + outputSchema.getFields().stream() + .map(field -> field.getName().orElse("")) + .collect(Collectors.toList()), + outputSchema.getFields().stream() + .map(DescribedSchema.Field::getType) + .collect(Collectors.toList())); + } + + private ReadTsFileTableFunctionHandle( + String tableName, + List<String> tsFilePaths, + List<String> outputColumnNames, + List<Type> outputColumnTypes) { + if (outputColumnNames.size() != outputColumnTypes.size()) { + throw new IllegalArgumentException("Output column names and types size mismatch"); + } + this.tableName = tableName; + this.tsFilePaths = Collections.unmodifiableList(new ArrayList<>(tsFilePaths)); + this.outputColumnNames = Collections.unmodifiableList(new ArrayList<>(outputColumnNames)); + this.outputColumnTypes = Collections.unmodifiableList(new ArrayList<>(outputColumnTypes)); + } + + public String getTableName() { + return tableName; + } + + public List<String> getTsFilePaths() { + return tsFilePaths; + } + + public List<String> getOutputColumnNames() { + return outputColumnNames; + } + + public List<Type> getOutputColumnTypes() { + return outputColumnTypes; + } + + @Override + public byte[] serialize() { + ByteBuffer buffer = ByteBuffer.allocate(calculateSerializeSize()); + writeString(buffer, tableName); + buffer.putInt(tsFilePaths.size()); + tsFilePaths.forEach(path -> writeString(buffer, path)); + buffer.putInt(outputColumnNames.size()); + for (int i = 0; i < outputColumnNames.size(); i++) { + writeString(buffer, outputColumnNames.get(i)); + buffer.put(outputColumnTypes.get(i).getType()); + } + return buffer.array(); + } + + @Override + public void deserialize(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.wrap(bytes); + tableName = readString(buffer); + int size = buffer.getInt(); + List<String> paths = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + paths.add(readString(buffer)); + } + tsFilePaths = Collections.unmodifiableList(paths); + size = buffer.getInt(); + List<String> columnNames = new ArrayList<>(size); + List<Type> columnTypes = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + columnNames.add(readString(buffer)); + columnTypes.add(Type.valueOf(buffer.get())); + } + outputColumnNames = Collections.unmodifiableList(columnNames); + outputColumnTypes = Collections.unmodifiableList(columnTypes); + } + + @Override + public String toString() { + return "ReadTsFileTableFunctionHandle{" + + "tableName='" + + tableName + + '\'' + + ", tsFilePaths=" + + tsFilePaths + + ", outputColumnNames=" + + outputColumnNames + + ", outputColumnTypes=" + + outputColumnTypes + + '}'; + } + + private int calculateSerializeSize() { + int size = Integer.BYTES + tableName.getBytes(StandardCharsets.UTF_8).length; + size += Integer.BYTES; + for (String path : tsFilePaths) { + size += Integer.BYTES + path.getBytes(StandardCharsets.UTF_8).length; + } + size += Integer.BYTES; + for (String columnName : outputColumnNames) { + size += Integer.BYTES + columnName.getBytes(StandardCharsets.UTF_8).length + Byte.BYTES; + } + return size; + } + + private static void writeString(ByteBuffer buffer, String value) { + byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + buffer.putInt(bytes.length); + buffer.put(bytes); + } + + private static String readString(ByteBuffer buffer) { + byte[] bytes = new byte[buffer.getInt()]; + buffer.get(bytes); + return new String(bytes, StandardCharsets.UTF_8); + } + } +}
