This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch read_tsfile_table_function in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9cf449d0eefd4743b78269da9914b863305b9971 Author: shuwenwei <[email protected]> AuthorDate: Wed Jun 10 09:54:49 2026 +0800 add external tsfile resource --- .../db/queryengine/common/MPPQueryContext.java | 51 ++ .../fragment/FragmentInstanceContext.java | 4 - .../ExternalTsFileAggTableScanOperator.java | 59 +- .../relational/ExternalTsFileSeriesScanUtil.java | 15 +- .../ExternalTsFileTableScanOperator.java | 59 +- .../queryengine/plan/execution/QueryExecution.java | 1 + .../planner/DataNodeTableOperatorGenerator.java | 10 +- .../plan/node/DataNodePlanNodeDeserializer.java | 3 - .../DataNodeTableBuiltinTableFunction.java | 10 +- .../readTsFile/ExternalTsFileQueryResource.java | 634 +++++++++++++++++++++ .../{ => readTsFile}/ReadTsFileTableFunction.java | 30 +- .../plan/relational/planner/RelationPlanner.java | 26 +- .../distribute/TableDistributedPlanGenerator.java | 204 ++++--- .../iterative/rule/PruneTableScanColumns.java | 9 +- .../planner/node/AggregationTableScanNode.java | 54 +- .../planner/node/DeviceTableScanNode.java | 5 - .../node/ExternalTsFileAggregationScanNode.java | 93 ++- .../planner/node/ExternalTsFileScanNode.java | 238 ++------ .../optimizations/PushPredicateIntoTableScan.java | 48 +- .../optimizations/UnaliasSymbolReferences.java | 9 +- 20 files changed, 1057 insertions(+), 505 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index 1e5bd2bff77..8b1303ef44e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -32,20 +32,27 @@ import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Identifier; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Query; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Table; import org.apache.iotdb.commons.queryengine.utils.cte.CteDataStore; +import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType; import org.apache.iotdb.db.queryengine.plan.planner.memory.NotThreadSafeMemoryReservationManager; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource; import org.apache.iotdb.db.queryengine.statistics.QueryPlanStatistics; import com.google.common.collect.ImmutableList; import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.nio.file.Paths; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -62,6 +69,8 @@ import java.util.function.LongConsumer; * info and so on. */ public class MPPQueryContext implements IAuditEntity { + private static final Logger LOGGER = LoggerFactory.getLogger(MPPQueryContext.class); + private String sql; private final QueryId queryId; @@ -158,6 +167,8 @@ public class MPPQueryContext implements IAuditEntity { // Tables in the subquery private final Map<NodeRef<Query>, List<Identifier>> subQueryTables = new HashMap<>(); + private List<ExternalTsFileQueryResource> externalTsFileQueryResources; + @TestOnly public MPPQueryContext(QueryId queryId) { this.queryId = queryId; @@ -243,6 +254,46 @@ public class MPPQueryContext implements IAuditEntity { return queryId; } + public ExternalTsFileQueryResource createExternalTsFileQueryResource( + String tableName, List<String> tsFilePaths) { + if (externalTsFileQueryResources == null) { + externalTsFileQueryResources = new ArrayList<>(); + } + ExternalTsFileQueryResource externalTsFileQueryResource = + new ExternalTsFileQueryResource( + queryId, + Paths.get(IoTDBDescriptor.getInstance().getConfig().getSortTmpDir()) + .resolve(ExternalTsFileQueryResource.EXTERNAL_TSFILE_TMP_DIR) + .resolve(queryId.getId()) + .resolve(String.valueOf(externalTsFileQueryResources.size())), + tableName, + tsFilePaths, + ignored -> {}, + true); + externalTsFileQueryResources.add(externalTsFileQueryResource); + return externalTsFileQueryResource; + } + + public void releaseExternalTsFileQueryResources() { + if (externalTsFileQueryResources == null) { + return; + } + for (ExternalTsFileQueryResource externalTsFileQueryResource : externalTsFileQueryResources) { + try { + externalTsFileQueryResource.close(); + } catch (Exception e) { + LOGGER.warn("Failed to release external TsFile query resource", e); + } + } + FileUtils.deleteFileOrDirectory( + Paths.get(IoTDBDescriptor.getInstance().getConfig().getSortTmpDir()) + .resolve(ExternalTsFileQueryResource.EXTERNAL_TSFILE_TMP_DIR) + .resolve(queryId.getId()) + .toFile(), + true); + externalTsFileQueryResources = null; + } + public long getLocalQueryId() { return localQueryId; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 654df2301d8..8ae3e885f5e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -826,7 +826,6 @@ public class FragmentInstanceContext extends QueryContext { resource.setTimeIndex(new FileTimeIndex(Long.MIN_VALUE, Long.MAX_VALUE)); } externalTsFileResources.add(resource); - FileReaderManager.getInstance().increaseExternalFileReaderReference(externalTsFilePath); } this.sharedQueryDataSource = @@ -1083,9 +1082,6 @@ public class FragmentInstanceContext extends QueryContext { } if (externalTsFileResources != null) { - for (TsFileResource tsFile : externalTsFileResources) { - FileReaderManager.getInstance().decreaseExternalFileReaderReference(tsFile.getTsFilePath()); - } externalTsFileResources = null; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java index 79a101a6a79..3b4b2704103 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java @@ -20,9 +20,11 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; import org.apache.iotdb.commons.path.AlignedFullPath; -import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanUtil; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceOffset; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.MultiWayMergeReader; import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -32,7 +34,7 @@ import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.RamUsageEstimator; import java.io.IOException; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath; @@ -42,19 +44,21 @@ public class ExternalTsFileAggTableScanOperator extends DefaultAggTableScanOpera RamUsageEstimator.shallowSizeOfInstance(ExternalTsFileAggTableScanOperator.class); private final String tableName; - private final List<List<ExternalTsFileDeviceOffset>> deviceOffsets; + private final ExternalTsFileQueryResource externalTsFileQueryResource; + private final int deviceTaskPartitionIndex; + private MultiWayMergeReader deviceTaskReader; + private int loadedDeviceOffsetIndex = -1; + private List<DeviceOffset> currentDeviceOffsets = Collections.emptyList(); public ExternalTsFileAggTableScanOperator( AbstractAggTableScanOperatorParameter parameter, String tableName, - List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { + ExternalTsFileQueryResource externalTsFileQueryResource, + int deviceTaskPartitionIndex) { super(parameter); this.tableName = tableName; - this.deviceOffsets = new ArrayList<>(deviceOffsets); - if (deviceCount != this.deviceOffsets.size()) { - throw new IllegalArgumentException( - "The size of external TsFile device offsets should be equal to device entries"); - } + this.externalTsFileQueryResource = externalTsFileQueryResource; + this.deviceTaskPartitionIndex = deviceTaskPartitionIndex; } @Override @@ -91,21 +95,52 @@ public class ExternalTsFileAggTableScanOperator extends DefaultAggTableScanOpera if (deviceEntries.isEmpty() || currentDeviceIndex >= deviceEntries.size()) { return null; } - List<ExternalTsFileDeviceOffset> currentDeviceOffsets = deviceOffsets.get(currentDeviceIndex); return ExternalTsFileSeriesScanUtil.loadTimeSeriesMetadata( resource, alignedPath, deviceEntries.get(currentDeviceIndex).getDeviceID(), - currentDeviceOffsets, + getCurrentDeviceOffsets(), + externalTsFileQueryResource.getTsFilePaths(), ((OperatorContext) operatorContext).getInstanceContext(), seriesScanOptions.getGlobalTimeFilter()); } + private List<DeviceOffset> getCurrentDeviceOffsets() throws IOException { + if (loadedDeviceOffsetIndex == currentDeviceIndex) { + return currentDeviceOffsets; + } + if (deviceTaskReader == null) { + deviceTaskReader = + externalTsFileQueryResource.getMultiWayMergeReader(deviceTaskPartitionIndex); + } + DeviceEntry currentDeviceEntry = deviceEntries.get(currentDeviceIndex); + while (deviceTaskReader.hasNextDevice()) { + DeviceEntry deviceEntry = deviceTaskReader.nextDevice(); + if (deviceEntry.getDeviceID().equals(currentDeviceEntry.getDeviceID())) { + currentDeviceOffsets = deviceTaskReader.getCurrentDeviceOffsets(); + loadedDeviceOffsetIndex = currentDeviceIndex; + return currentDeviceOffsets; + } + } + currentDeviceOffsets = Collections.emptyList(); + loadedDeviceOffsetIndex = currentDeviceIndex; + return currentDeviceOffsets; + } + + @Override + public void close() throws Exception { + if (deviceTaskReader != null) { + deviceTaskReader.close(); + deviceTaskReader = null; + } + super.close(); + } + @Override public long ramBytesUsed() { return super.ramBytesUsed() + INSTANCE_SIZE - AbstractDefaultAggTableScanOperator.INSTANCE_SIZE - + RamUsageEstimator.sizeOfCollection(deviceOffsets); + + RamUsageEstimator.sizeOfCollection(currentDeviceOffsets); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java index f46e3b04346..6ad6aa2af3f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java @@ -20,11 +20,11 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; import org.apache.iotdb.commons.path.AlignedFullPath; -import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil; import org.apache.iotdb.db.queryengine.execution.operator.source.FileLoaderUtils; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceOffset; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -86,7 +86,8 @@ public class ExternalTsFileSeriesScanUtil extends AlignedSeriesScanUtil { TsFileResource resource, AlignedFullPath alignedPath, IDeviceID currentDeviceID, - List<ExternalTsFileDeviceOffset> currentDeviceOffsets, + List<DeviceOffset> currentDeviceOffsets, + List<String> tsFilePaths, FragmentInstanceContext context, Filter globalTimeFilter) throws IOException { @@ -95,7 +96,7 @@ public class ExternalTsFileSeriesScanUtil extends AlignedSeriesScanUtil { } long[] deviceMeasurementNodeOffset = - getDeviceMeasurementNodeOffset(currentDeviceOffsets, resource.getTsFilePath()); + getDeviceMeasurementNodeOffset(currentDeviceOffsets, tsFilePaths, resource.getTsFilePath()); if (deviceMeasurementNodeOffset == null) { return null; } @@ -111,10 +112,10 @@ public class ExternalTsFileSeriesScanUtil extends AlignedSeriesScanUtil { } private static long[] getDeviceMeasurementNodeOffset( - List<ExternalTsFileDeviceOffset> currentDeviceOffsets, String tsFilePath) { - for (ExternalTsFileDeviceOffset offset : currentDeviceOffsets) { - if (tsFilePath.equals(offset.getTsFilePath())) { - return offset.getDeviceMeasurementNodeOffset(); + List<DeviceOffset> currentDeviceOffsets, List<String> tsFilePaths, String tsFilePath) { + for (DeviceOffset offset : currentDeviceOffsets) { + if (tsFilePath.equals(tsFilePaths.get(offset.getFileIndex()))) { + return offset.getMeasurementNodeOffset(); } } return null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java index 2ad90b1d460..ad14540e291 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java @@ -20,8 +20,10 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; import org.apache.iotdb.commons.path.AlignedFullPath; -import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceOffset; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.MultiWayMergeReader; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -29,7 +31,7 @@ import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata; import org.apache.tsfile.utils.RamUsageEstimator; import java.io.IOException; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; public class ExternalTsFileTableScanOperator extends TableScanOperator { @@ -39,19 +41,21 @@ public class ExternalTsFileTableScanOperator extends TableScanOperator { RamUsageEstimator.shallowSizeOfInstance(AbstractDeviceTableScanOperator.class); private final String tableName; - private final List<List<ExternalTsFileDeviceOffset>> deviceOffsets; + private final ExternalTsFileQueryResource externalTsFileQueryResource; + private final int deviceTaskPartitionIndex; + private MultiWayMergeReader deviceTaskReader; + private int loadedDeviceOffsetIndex = -1; + private List<DeviceOffset> currentDeviceOffsets = Collections.emptyList(); public ExternalTsFileTableScanOperator( AbstractTableScanOperatorParameter parameter, String tableName, - List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { + ExternalTsFileQueryResource externalTsFileQueryResource, + int deviceTaskPartitionIndex) { super(parameter); this.tableName = tableName; - this.deviceOffsets = new ArrayList<>(deviceOffsets); - if (deviceCount != this.deviceOffsets.size()) { - throw new IllegalArgumentException( - "The size of external TsFile device offsets should be equal to device entries"); - } + this.externalTsFileQueryResource = externalTsFileQueryResource; + this.deviceTaskPartitionIndex = deviceTaskPartitionIndex; } @Override @@ -90,21 +94,52 @@ public class ExternalTsFileTableScanOperator extends TableScanOperator { private AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata( TsFileResource resource, AlignedFullPath alignedPath) throws IOException { - List<ExternalTsFileDeviceOffset> currentDeviceOffsets = deviceOffsets.get(currentDeviceIndex); return ExternalTsFileSeriesScanUtil.loadTimeSeriesMetadata( resource, alignedPath, getCurrentDeviceEntry().getDeviceID(), - currentDeviceOffsets, + getCurrentDeviceOffsets(), + externalTsFileQueryResource.getTsFilePaths(), ((OperatorContext) operatorContext).getInstanceContext(), seriesScanOptions.getGlobalTimeFilter()); } + private List<DeviceOffset> getCurrentDeviceOffsets() throws IOException { + if (loadedDeviceOffsetIndex == currentDeviceIndex) { + return currentDeviceOffsets; + } + if (deviceTaskReader == null) { + deviceTaskReader = + externalTsFileQueryResource.getMultiWayMergeReader(deviceTaskPartitionIndex); + } + DeviceEntry currentDeviceEntry = getCurrentDeviceEntry(); + while (deviceTaskReader.hasNextDevice()) { + DeviceEntry deviceEntry = deviceTaskReader.nextDevice(); + if (deviceEntry.getDeviceID().equals(currentDeviceEntry.getDeviceID())) { + currentDeviceOffsets = deviceTaskReader.getCurrentDeviceOffsets(); + loadedDeviceOffsetIndex = currentDeviceIndex; + return currentDeviceOffsets; + } + } + currentDeviceOffsets = Collections.emptyList(); + loadedDeviceOffsetIndex = currentDeviceIndex; + return currentDeviceOffsets; + } + + @Override + public void close() throws Exception { + if (deviceTaskReader != null) { + deviceTaskReader.close(); + deviceTaskReader = null; + } + super.close(); + } + @Override public long ramBytesUsed() { return super.ramBytesUsed() + INSTANCE_SIZE - ABSTRACT_DEVICE_TABLE_SCAN_OPERATOR_INSTANCE_SIZE - + RamUsageEstimator.sizeOfCollection(deviceOffsets); + + RamUsageEstimator.sizeOfCollection(currentDeviceOffsets); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index a5f96e3c6c0..0a921dd2dd8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -392,6 +392,7 @@ public class QueryExecution implements IQueryExecution { } cleanUpResultHandle(); } + context.releaseExternalTsFileQueryResources(); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java index 85835931f24..b0fc9e1d554 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java @@ -1138,7 +1138,10 @@ public class DataNodeTableOperatorGenerator AbstractTableScanOperator externalTsFileTableScanOperator = new ExternalTsFileTableScanOperator( - parameter, node.getQualifiedObjectName().getObjectName(), node.getDeviceOffsets()); + parameter, + node.getQualifiedObjectName().getObjectName(), + node.getExternalTsFileQueryResource(), + node.getDeviceTaskPartitionIndex()); context.getInstanceContext().collectTable(node.getQualifiedObjectName().getObjectName()); @@ -1638,7 +1641,10 @@ public class DataNodeTableOperatorGenerator ExternalTsFileAggTableScanOperator aggTableScanOperator = new ExternalTsFileAggTableScanOperator( - parameter, node.getQualifiedObjectName().getObjectName(), node.getDeviceOffsets()); + parameter, + node.getQualifiedObjectName().getObjectName(), + node.getExternalTsFileQueryResource(), + node.getDeviceTaskPartitionIndex()); context.getInstanceContext().collectTable(node.getQualifiedObjectName().getObjectName()); addSource( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java index b79011de3ec..2a9b9b6905c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java @@ -123,7 +123,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalIn import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode; -import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableDiskUsageInformationSchemaTableScanNode; @@ -470,8 +469,6 @@ public class DataNodePlanNodeDeserializer extends CommonPlanNodeDeserializer { return AlignedAggregationTreeDeviceViewScanNode.deserialize(buffer); case 1042: return NonAlignedAggregationTreeDeviceViewScanNode.deserialize(buffer); - case 1043: - return ExternalTsFileScanNode.deserialize(buffer); case 2000: return RelationalInsertTabletNode.deserialize(buffer); case 2001: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/DataNodeTableBuiltinTableFunction.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/DataNodeTableBuiltinTableFunction.java index 858c2ab3926..8e4d0830d41 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/DataNodeTableBuiltinTableFunction.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/DataNodeTableBuiltinTableFunction.java @@ -21,11 +21,10 @@ package org.apache.iotdb.db.queryengine.plan.relational.function; import org.apache.iotdb.commons.exception.SemanticException; import org.apache.iotdb.commons.i18n.QueryMessages; -import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.ReadTsFileTableFunction; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ReadTsFileTableFunction; import org.apache.iotdb.udf.api.relational.TableFunction; import java.util.Arrays; -import java.util.HashSet; import java.util.Set; import java.util.stream.Collectors; @@ -43,10 +42,9 @@ public enum DataNodeTableBuiltinTableFunction { } private static final Set<String> BUILT_IN_TABLE_FUNCTION_NAME = - new HashSet<>( - Arrays.stream(DataNodeTableBuiltinTableFunction.values()) - .map(DataNodeTableBuiltinTableFunction::getFunctionName) - .collect(Collectors.toList())); + Arrays.stream(DataNodeTableBuiltinTableFunction.values()) + .map(DataNodeTableBuiltinTableFunction::getFunctionName) + .collect(Collectors.toSet()); public static Set<String> getBuiltInTableFunctionName() { return BUILT_IN_TABLE_FUNCTION_NAME; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java new file mode 100644 index 00000000000..8371c6a1687 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java @@ -0,0 +1,634 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile; + +import org.apache.iotdb.commons.schema.filter.SchemaFilter; +import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.ExternalTsFileDeviceFilterVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.LazyTsFileDeviceIterator; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.function.LongConsumer; + +import static java.util.Objects.requireNonNull; + +/** + * Query-scoped resource for external TsFile scans. + * + * <p>Reader instances are owned by {@link FileReaderManager}; this class only balances reference + * increments/decrements for the query's external TsFile paths and deletes the query temporary + * directory. + */ +public class ExternalTsFileQueryResource implements AutoCloseable { + + public static final String EXTERNAL_TSFILE_TMP_DIR = "external-tsfile"; + private static final long DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES = 8L * 1024 * 1024; + private static final long DEVICE_OFFSET_INSTANCE_SIZE_IN_BYTES = 32L; + + private final QueryId queryId; + private final Path queryTempRoot; + private final String tableName; + private final List<String> tsFilePaths; + private final LongConsumer ioSizeRecorder; + private final List<DeviceEntry> deviceEntries = new ArrayList<>(); + private List<DeviceTaskPartition> deviceTaskPartitions = Collections.emptyList(); + private Comparator<DeviceEntry> deviceEntryComparator; + + private boolean readersRetained; + private boolean closed; + + public ExternalTsFileQueryResource( + QueryId queryId, + Path tempRoot, + String tableName, + List<String> tsFilePaths, + LongConsumer ioSizeRecorder, + boolean useExactTempRoot) { + this.queryId = queryId; + this.queryTempRoot = + useExactTempRoot + ? requireNonNull(tempRoot, "tempRoot is null") + : requireNonNull(tempRoot, "tempRoot is null") + .resolve(requireNonNull(queryId, "queryId is null").getId()); + this.tableName = tableName; + this.tsFilePaths = + Collections.unmodifiableList(new ArrayList<>(requireNonNull(tsFilePaths, "tsFilePaths"))); + this.ioSizeRecorder = requireNonNull(ioSizeRecorder, "ioSizeRecorder is null"); + } + + public synchronized void collectDeviceEntries( + SchemaFilter schemaFilter, Comparator<DeviceEntry> comparator, int partitionCount) { + checkNotClosed(); + retainFileReaderReferences(); + ExternalTsFileDeviceFilterVisitor deviceFilterVisitor = new ExternalTsFileDeviceFilterVisitor(); + try (DeviceCollector deviceCollector = new DeviceCollector()) { + List<DeviceTaskPartition> partitions = createDeviceTaskPartitions(partitionCount); + while (deviceCollector.hasNextDevice()) { + IDeviceID deviceID = deviceCollector.nextDevice(); + if (schemaFilter != null + && !Boolean.TRUE.equals(schemaFilter.accept(deviceFilterVisitor, deviceID))) { + continue; + } + DeviceEntry deviceEntry = new AlignedDeviceEntry(deviceID, new Binary[0]); + int deviceEntryIndex = deviceEntries.size(); + deviceEntries.add(deviceEntry); + DeviceTask deviceTask = + new DeviceTask( + deviceEntryIndex, new ArrayList<>(deviceCollector.getCurrentDeviceOffsets())); + DeviceTaskPartition partition = + partitions.get(Math.floorMod(deviceID.hashCode(), partitions.size())); + partition.add(deviceTask); + if (partition.getEstimatedSizeInBytes() >= DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES) { + partition.flush(comparator); + } + } + deviceEntryComparator = comparator; + collectDeviceTaskPartitions(partitions, comparator); + } + } + + public synchronized MultiWayMergeReader getMultiWayMergeReader(int partitionIndex) { + checkNotClosed(); + DeviceTaskPartition partition = getDeviceTaskPartition(partitionIndex); + try { + return new DeviceTaskRunReader(partition.getRunFiles(), deviceEntries, deviceEntryComparator); + } catch (IOException e) { + throw new RuntimeException("Failed to create external TsFile device task run reader", e); + } + } + + public List<String> getTsFilePaths() { + return tsFilePaths; + } + + public List<DeviceEntry> getDeviceEntries() { + return deviceEntries; + } + + public List<DeviceTaskPartition> getDeviceTaskPartitions() { + return deviceTaskPartitions; + } + + private DeviceTaskPartition getDeviceTaskPartition(int partitionIndex) { + for (DeviceTaskPartition partition : deviceTaskPartitions) { + if (partition.getPartitionIndex() == partitionIndex) { + return partition; + } + } + throw new IllegalArgumentException( + "Unknown external TsFile device task partition: " + partitionIndex); + } + + @Override + public synchronized void close() { + if (closed) { + return; + } + closed = true; + + releaseFileReaderReferences(); + + if (Files.exists(queryTempRoot)) { + FileUtils.deleteFileOrDirectory(queryTempRoot.toFile(), true); + } + } + + private void retainFileReaderReferences() { + if (readersRetained) { + return; + } + for (String tsFilePath : tsFilePaths) { + FileReaderManager.getInstance().increaseExternalFileReaderReference(tsFilePath); + } + readersRetained = true; + } + + private void releaseFileReaderReferences() { + if (!readersRetained) { + return; + } + for (String tsFilePath : tsFilePaths) { + FileReaderManager.getInstance().decreaseExternalFileReaderReference(tsFilePath); + } + readersRetained = false; + } + + private void checkNotClosed() { + if (closed) { + throw new IllegalStateException("External TsFile query resource has been closed: " + queryId); + } + } + + public interface MultiWayMergeReader extends AutoCloseable { + boolean hasNextDevice() throws IOException; + + DeviceEntry nextDevice() throws IOException; + + List<DeviceOffset> getCurrentDeviceOffsets(); + + @Override + void close() throws IOException; + } + + public class DeviceTaskPartition { + + private final int partitionIndex; + private final List<DeviceTask> pendingDeviceTasks = new ArrayList<>(); + private final List<Integer> deviceEntryIndexes = new ArrayList<>(); + private final List<Path> runFiles = new ArrayList<>(); + private long estimatedSizeInBytes; + + private DeviceTaskPartition(int partitionIndex) { + this.partitionIndex = partitionIndex; + } + + public int getPartitionIndex() { + return partitionIndex; + } + + public List<Integer> getDeviceEntryIndexes() { + return deviceEntryIndexes; + } + + private void add(DeviceTask deviceTask) { + pendingDeviceTasks.add(deviceTask); + estimatedSizeInBytes += estimateDeviceTaskSize(deviceTask); + } + + private void flush(Comparator<DeviceEntry> comparator) { + if (pendingDeviceTasks.isEmpty()) { + return; + } + sortPendingDeviceTasks(comparator); + try { + runFiles.add( + writeDeviceTaskRun( + queryTempRoot.resolve("child-" + partitionIndex), + runFiles.size(), + pendingDeviceTasks)); + } catch (IOException e) { + throw new RuntimeException("Failed to flush external TsFile device task partition", e); + } + for (DeviceTask deviceTask : pendingDeviceTasks) { + deviceEntryIndexes.add(deviceTask.deviceEntryIndex); + } + pendingDeviceTasks.clear(); + estimatedSizeInBytes = 0; + } + + private void sortPendingDeviceTasks(Comparator<DeviceEntry> comparator) { + if (comparator != null) { + pendingDeviceTasks.sort( + (left, right) -> + comparator.compare( + deviceEntries.get(left.deviceEntryIndex), + deviceEntries.get(right.deviceEntryIndex))); + } else { + pendingDeviceTasks.sort( + (left, right) -> + deviceEntries + .get(left.deviceEntryIndex) + .getDeviceID() + .compareTo(deviceEntries.get(right.deviceEntryIndex).getDeviceID())); + } + } + + private long getEstimatedSizeInBytes() { + return estimatedSizeInBytes; + } + + private boolean hasDeviceTasks() { + return !deviceEntryIndexes.isEmpty(); + } + + private void sortDeviceEntries(Comparator<DeviceEntry> comparator) { + if (comparator != null) { + deviceEntryIndexes.sort( + (left, right) -> comparator.compare(deviceEntries.get(left), deviceEntries.get(right))); + } else { + deviceEntryIndexes.sort( + (left, right) -> + deviceEntries + .get(left) + .getDeviceID() + .compareTo(deviceEntries.get(right).getDeviceID())); + } + } + + private List<Path> getRunFiles() { + return runFiles; + } + } + + private List<DeviceTaskPartition> createDeviceTaskPartitions(int partitionCount) { + if (partitionCount <= 0) { + throw new IllegalArgumentException( + "External TsFile device task partition count must be positive"); + } + List<DeviceTaskPartition> partitions = new ArrayList<>(partitionCount); + for (int i = 0; i < partitionCount; i++) { + partitions.add(new DeviceTaskPartition(i)); + } + return partitions; + } + + private void collectDeviceTaskPartitions( + List<DeviceTaskPartition> partitions, Comparator<DeviceEntry> comparator) { + if (partitions.isEmpty()) { + deviceTaskPartitions = Collections.emptyList(); + return; + } + List<DeviceTaskPartition> nonEmptyPartitions = new ArrayList<>(partitions.size()); + for (DeviceTaskPartition partition : partitions) { + partition.flush(comparator); + if (!partition.hasDeviceTasks()) { + continue; + } + partition.sortDeviceEntries(comparator); + nonEmptyPartitions.add(partition); + } + deviceTaskPartitions = nonEmptyPartitions; + } + + private Path writeDeviceTaskRun(Path runRoot, int runIndex, List<DeviceTask> deviceTasks) + throws IOException { + Files.createDirectories(runRoot); + Path runFile = runRoot.resolve("run-" + runIndex + ".bin"); + try (DataOutputStream outputStream = + new DataOutputStream(new BufferedOutputStream(Files.newOutputStream(runFile)))) { + ReadWriteIOUtils.write(deviceTasks.size(), outputStream); + for (DeviceTask deviceTask : deviceTasks) { + deviceTask.serialize(outputStream); + } + } + return runFile; + } + + private static long estimateDeviceTaskSize(DeviceTask deviceTask) { + long size = 64L; + for (DeviceOffset offset : deviceTask.deviceOffsets) { + size += + DEVICE_OFFSET_INSTANCE_SIZE_IN_BYTES + + (long) Long.BYTES * offset.measurementNodeOffset.length; + } + return size; + } + + private static class DeviceTaskRunReader implements MultiWayMergeReader { + + private final List<DeviceEntry> deviceEntries; + private final PriorityQueue<DeviceTaskRunCursor> runCursors; + private DeviceTask nextDeviceTask; + private List<DeviceOffset> currentDeviceOffsets = Collections.emptyList(); + + private DeviceTaskRunReader( + List<Path> runFiles, List<DeviceEntry> deviceEntries, Comparator<DeviceEntry> comparator) + throws IOException { + this.deviceEntries = deviceEntries; + Comparator<DeviceTaskRunCursor> cursorComparator = + (left, right) -> + comparator == null + ? left.getCurrentDeviceEntry() + .getDeviceID() + .compareTo(right.getCurrentDeviceEntry().getDeviceID()) + : comparator.compare(left.getCurrentDeviceEntry(), right.getCurrentDeviceEntry()); + this.runCursors = new PriorityQueue<>(cursorComparator); + for (Path runFile : runFiles) { + DeviceTaskRunCursor cursor = new DeviceTaskRunCursor(runFile, deviceEntries); + if (cursor.hasCurrentDeviceTask()) { + runCursors.add(cursor); + } else { + cursor.close(); + } + } + } + + @Override + public boolean hasNextDevice() throws IOException { + if (nextDeviceTask != null) { + return true; + } + nextDeviceTask = readNextDeviceTask(); + return nextDeviceTask != null; + } + + @Override + public DeviceEntry nextDevice() throws IOException { + if (!hasNextDevice()) { + throw new EOFException("No more external TsFile device task"); + } + DeviceTask deviceTask = nextDeviceTask; + nextDeviceTask = null; + currentDeviceOffsets = deviceTask.deviceOffsets; + return deviceEntries.get(deviceTask.deviceEntryIndex); + } + + @Override + public List<DeviceOffset> getCurrentDeviceOffsets() { + return currentDeviceOffsets; + } + + @Override + public void close() throws IOException { + IOException exception = null; + while (!runCursors.isEmpty()) { + try { + runCursors.poll().close(); + } catch (IOException e) { + if (exception == null) { + exception = e; + } else { + exception.addSuppressed(e); + } + } + } + if (exception != null) { + throw exception; + } + } + + private DeviceTask readNextDeviceTask() throws IOException { + if (runCursors.isEmpty()) { + return null; + } + DeviceTaskRunCursor cursor = runCursors.poll(); + DeviceTask result = cursor.getCurrentDeviceTask(); + cursor.advance(); + if (cursor.hasCurrentDeviceTask()) { + runCursors.add(cursor); + } else { + cursor.close(); + } + return result; + } + } + + private static class DeviceTaskRunCursor implements Closeable { + + private final List<DeviceEntry> deviceEntries; + private final DataInputStream inputStream; + private int remainingDeviceTasks; + private DeviceTask currentDeviceTask; + + private DeviceTaskRunCursor(Path runFile, List<DeviceEntry> deviceEntries) throws IOException { + this.deviceEntries = deviceEntries; + this.inputStream = + new DataInputStream(new BufferedInputStream(Files.newInputStream(runFile))); + this.remainingDeviceTasks = ReadWriteIOUtils.readInt(inputStream); + advance(); + } + + private void advance() throws IOException { + if (remainingDeviceTasks <= 0) { + currentDeviceTask = null; + return; + } + remainingDeviceTasks--; + currentDeviceTask = DeviceTask.deserialize(inputStream); + } + + private boolean hasCurrentDeviceTask() { + return currentDeviceTask != null; + } + + private DeviceTask getCurrentDeviceTask() { + return currentDeviceTask; + } + + private DeviceEntry getCurrentDeviceEntry() { + return deviceEntries.get(currentDeviceTask.deviceEntryIndex); + } + + @Override + public void close() throws IOException { + inputStream.close(); + } + } + + private class DeviceCollector implements Closeable { + + private final Map<Integer, LazyTsFileDeviceIterator> deviceIteratorMap = new HashMap<>(); + + private IDeviceID currentDevice; + private List<DeviceOffset> currentDeviceOffsets = Collections.emptyList(); + + private DeviceCollector() { + try { + for (int fileIndex = 0; fileIndex < tsFilePaths.size(); fileIndex++) { + TsFileSequenceReader reader = + FileReaderManager.getInstance() + .get(tsFilePaths.get(fileIndex), null, true, ioSizeRecorder, true); + deviceIteratorMap.put( + fileIndex, new LazyTsFileDeviceIterator(reader, tableName, ioSizeRecorder)); + } + } catch (IOException e) { + close(); + throw new RuntimeException("Failed to create external TsFile device collector", e); + } + } + + private boolean hasNextDevice() { + for (LazyTsFileDeviceIterator deviceIterator : deviceIteratorMap.values()) { + if (deviceIterator.hasNext() + || (deviceIterator.hasCurrent() + && !deviceIterator.getCurrentDeviceID().equals(currentDevice))) { + return true; + } + } + return false; + } + + private IDeviceID nextDevice() { + IDeviceID minDevice = null; + Iterator<Map.Entry<Integer, LazyTsFileDeviceIterator>> iterator = + deviceIteratorMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<Integer, LazyTsFileDeviceIterator> entry = iterator.next(); + LazyTsFileDeviceIterator deviceIterator = entry.getValue(); + IDeviceID currentFileDevice = null; + if (deviceIterator.hasCurrent()) { + currentFileDevice = deviceIterator.getCurrentDeviceID(); + } + if (currentFileDevice == null || currentFileDevice.equals(currentDevice)) { + if (deviceIterator.hasNext()) { + currentFileDevice = deviceIterator.next(); + } else { + iterator.remove(); + continue; + } + } + if (minDevice == null || minDevice.compareTo(currentFileDevice) > 0) { + minDevice = currentFileDevice; + } + } + currentDevice = minDevice; + collectCurrentDeviceOffsets(); + return currentDevice; + } + + private void collectCurrentDeviceOffsets() { + List<DeviceOffset> deviceOffsets = new ArrayList<>(); + for (Map.Entry<Integer, LazyTsFileDeviceIterator> entry : deviceIteratorMap.entrySet()) { + LazyTsFileDeviceIterator deviceIterator = entry.getValue(); + if (currentDevice != null + && deviceIterator.hasCurrent() + && currentDevice.equals(deviceIterator.getCurrentDeviceID())) { + deviceOffsets.add( + new DeviceOffset( + entry.getKey(), deviceIterator.getCurrentDeviceMeasurementNodeOffset())); + } + } + currentDeviceOffsets = deviceOffsets; + } + + private List<DeviceOffset> getCurrentDeviceOffsets() { + return currentDeviceOffsets; + } + + @Override + public void close() { + deviceIteratorMap.clear(); + currentDeviceOffsets = Collections.emptyList(); + } + } + + private static class DeviceTask { + + private final int deviceEntryIndex; + private final List<DeviceOffset> deviceOffsets; + + private DeviceTask(int deviceEntryIndex, List<DeviceOffset> deviceOffsets) { + this.deviceEntryIndex = deviceEntryIndex; + this.deviceOffsets = deviceOffsets; + } + + private void serialize(DataOutputStream outputStream) throws IOException { + ReadWriteIOUtils.write(deviceEntryIndex, outputStream); + ReadWriteIOUtils.write(deviceOffsets.size(), outputStream); + for (DeviceOffset offset : deviceOffsets) { + ReadWriteIOUtils.write(offset.fileIndex, outputStream); + ReadWriteIOUtils.write(offset.measurementNodeOffset.length, outputStream); + for (long measurementNodeOffset : offset.measurementNodeOffset) { + ReadWriteIOUtils.write(measurementNodeOffset, outputStream); + } + } + } + + private static DeviceTask deserialize(DataInputStream inputStream) throws IOException { + int deviceEntryIndex = ReadWriteIOUtils.readInt(inputStream); + int offsetSize = ReadWriteIOUtils.readInt(inputStream); + List<DeviceOffset> offsets = new ArrayList<>(offsetSize); + for (int i = 0; i < offsetSize; i++) { + int fileIndex = ReadWriteIOUtils.readInt(inputStream); + int measurementNodeOffsetLength = ReadWriteIOUtils.readInt(inputStream); + long[] measurementNodeOffset = new long[measurementNodeOffsetLength]; + for (int j = 0; j < measurementNodeOffsetLength; j++) { + measurementNodeOffset[j] = inputStream.readLong(); + } + offsets.add(new DeviceOffset(fileIndex, measurementNodeOffset)); + } + return new DeviceTask(deviceEntryIndex, offsets); + } + } + + public static class DeviceOffset { + + private final int fileIndex; + private final long[] measurementNodeOffset; + + private DeviceOffset(int fileIndex, long[] measurementNodeOffset) { + this.fileIndex = fileIndex; + this.measurementNodeOffset = measurementNodeOffset; + } + + public int getFileIndex() { + return fileIndex; + } + + public long[] getMeasurementNodeOffset() { + return measurementNodeOffset; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ReadTsFileTableFunction.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ReadTsFileTableFunction.java similarity index 95% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ReadTsFileTableFunction.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ReadTsFileTableFunction.java index 71ed1b3670f..26f7143595d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ReadTsFileTableFunction.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ReadTsFileTableFunction.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.queryengine.plan.relational.function.tvf; +package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; @@ -118,7 +118,7 @@ public class ReadTsFileTableFunction implements TableFunction { public TableFunctionProcessorProvider getProcessorProvider( TableFunctionHandle tableFunctionHandle) { throw new UnsupportedOperationException( - "read_tsfile must be planned as an ExternalTsFileScanNode"); + "readTsFile must be planned as an ExternalTsFileScanNode"); } private static String getRequiredStringArgument(Map<String, Argument> arguments, String name) { @@ -172,7 +172,7 @@ public class ReadTsFileTableFunction implements TableFunction { if (normalizedTsFilePath.startsWith(dataDir) || dataDir.startsWith(normalizedTsFilePath)) { throw new UDFArgumentNotValidException( String.format( - "read_tsfile path %s is not allowed because it may access IoTDB data directory %s", + "readTsFile path %s is not allowed because it may access IoTDB data directory %s", tsFilePath, dataDir)); } } @@ -402,30 +402,6 @@ public class ReadTsFileTableFunction implements TableFunction { return builder.build(); } - public static class ExternalTsFileDeviceOffset { - - private final String tsFilePath; - private final long[] deviceMeasurementNodeOffset; - - public ExternalTsFileDeviceOffset(String tsFilePath, long[] deviceMeasurementNodeOffset) { - this.tsFilePath = tsFilePath; - this.deviceMeasurementNodeOffset = - deviceMeasurementNodeOffset == null - ? null - : Arrays.copyOf(deviceMeasurementNodeOffset, deviceMeasurementNodeOffset.length); - } - - public String getTsFilePath() { - return tsFilePath; - } - - public long[] getDeviceMeasurementNodeOffset() { - return deviceMeasurementNodeOffset == null - ? null - : Arrays.copyOf(deviceMeasurementNodeOffset, deviceMeasurementNodeOffset.length); - } - } - public static class ReadTsFileTableFunctionHandle implements TableFunctionHandle { private String tableName; private List<String> tsFilePaths; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index 5855b15922f..8fdf2c1aa1b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@ -115,7 +115,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Scope; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.tablefunction.TableArgumentAnalysis; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.tablefunction.TableFunctionInvocationAnalysis; import org.apache.iotdb.db.queryengine.plan.relational.function.DataNodeTableBuiltinTableFunction; -import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.ReadTsFileTableFunction; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ReadTsFileTableFunction; import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl; import org.apache.iotdb.db.queryengine.plan.relational.metadata.TreeDeviceViewSchema; import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils; @@ -1596,7 +1596,7 @@ public class RelationPlanner implements AstVisitor<RelationPlan, Void> { TableFunctionInvocation node, TableFunctionInvocationAnalysis functionAnalysis) { if (!(functionAnalysis.getTableFunctionHandle() instanceof ReadTsFileTableFunction.ReadTsFileTableFunctionHandle)) { - throw new IllegalStateException("read_tsfile table function handle is invalid"); + throw new IllegalStateException("readTsFile table function handle is invalid"); } ReadTsFileTableFunction.ReadTsFileTableFunctionHandle handle = @@ -1607,17 +1607,20 @@ public class RelationPlanner implements AstVisitor<RelationPlan, Void> { ImmutableList.Builder<Symbol> outputSymbolsBuilder = ImmutableList.builder(); ImmutableMap.Builder<Symbol, ColumnSchema> assignmentsBuilder = ImmutableMap.builder(); + Map<Symbol, Integer> tagAndAttributeIndexMap = new HashMap<>(); + int tagIndex = 0; for (int i = 0; i < relationType.getAllFieldCount(); i++) { Field field = relationType.getFieldByIndex(i); Symbol symbol = symbolAllocator.newSymbol(field); + TsTableColumnCategory columnCategory = handle.getOutputColumnCategories().get(i); outputSymbolsBuilder.add(symbol); assignmentsBuilder.put( symbol, new ColumnSchema( - field.getName().orElse(null), - field.getType(), - field.isHidden(), - handle.getOutputColumnCategories().get(i))); + field.getName().orElse(null), field.getType(), field.isHidden(), columnCategory)); + if (columnCategory == TsTableColumnCategory.TAG) { + tagAndAttributeIndexMap.put(symbol, tagIndex++); + } } List<Symbol> outputSymbols = outputSymbolsBuilder.build(); @@ -1626,16 +1629,17 @@ public class RelationPlanner implements AstVisitor<RelationPlan, Void> { createExternalTsFileQualifiedObjectName(handle.getTableName()); analysis.addTableSchema(qualifiedObjectName, assignments); - return new RelationPlan( + ExternalTsFileScanNode scanNode = new ExternalTsFileScanNode( idAllocator.genPlanNodeId(), qualifiedObjectName, outputSymbols, assignments, - handle.getTsFilePaths()), - scope, - outputSymbols, - outerContext); + tagAndAttributeIndexMap, + queryContext.createExternalTsFileQueryResource( + handle.getTableName(), handle.getTsFilePaths())); + + return new RelationPlan(scanNode, scope, outputSymbols, outerContext); } private QualifiedObjectName createExternalTsFileQualifiedObjectName(String tableName) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 105dc1c1935..0b54b2395a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -73,7 +73,6 @@ import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.SymbolRefere import org.apache.iotdb.commons.schema.table.InformationSchema; import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; -import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -88,6 +87,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.exceptions.RootFIPlacementEx import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceTaskPartition; import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator; @@ -737,46 +737,22 @@ public class TableDistributedPlanGenerator new TRegionReplicaSet(null, ImmutableList.of(DataNodeEndPoints.getLocalDataNodeLocation())); node.setRegionReplicaSet(localRegionReplicaSet); context.mostUsedRegion = node.getRegionReplicaSet(); - List<PlanNode> resultNodes = - splitExternalTsFileScanByDeviceEntries(node, localRegionReplicaSet); - if (context.hasSortProperty) { - processSortProperty(node, resultNodes, context); - } - return resultNodes; - } - - private List<PlanNode> splitExternalTsFileScanByDeviceEntries( - final ExternalTsFileScanNode node, final TRegionReplicaSet localRegionReplicaSet) { - List<DeviceEntry> deviceEntries = node.getDeviceEntries(); - if (deviceEntries.size() <= 1) { - return Collections.singletonList(node); - } - - int splitCount = - Math.min( - deviceEntries.size(), - IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism()); - if (splitCount <= 1) { + Optional<SortPropertyContext> sortPropertyContext = + context.hasSortProperty ? analyzeSortProperty(node, context) : Optional.empty(); + int partitionCount = IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism(); + node.getExternalTsFileQueryResource() + .collectDeviceEntries( + node.getSchemaFilter(), + sortPropertyContext.map(propertyContext -> propertyContext.comparator).orElse(null), + partitionCount); + + List<DeviceTaskPartition> partitions = + node.getExternalTsFileQueryResource().getDeviceTaskPartitions(); + if (partitions.isEmpty()) { return Collections.singletonList(node); } - - List<List<DeviceEntry>> splitDeviceEntries = new ArrayList<>(splitCount); - List<List<List<ExternalTsFileDeviceOffset>>> splitDeviceOffsets = new ArrayList<>(splitCount); - for (int i = 0; i < splitCount; i++) { - splitDeviceEntries.add(new ArrayList<>()); - splitDeviceOffsets.add(new ArrayList<>()); - } - for (int i = 0; i < deviceEntries.size(); i++) { - splitDeviceEntries.get(i % splitCount).add(deviceEntries.get(i)); - splitDeviceOffsets.get(i % splitCount).add(node.getDeviceOffsets().get(i)); - } - - List<PlanNode> result = new ArrayList<>(splitCount); - for (int i = 0; i < splitDeviceEntries.size(); i++) { - List<DeviceEntry> entries = splitDeviceEntries.get(i); - if (entries.isEmpty()) { - continue; - } + List<PlanNode> result = new ArrayList<>(partitions.size()); + for (DeviceTaskPartition partition : partitions) { ExternalTsFileScanNode splitNode = new ExternalTsFileScanNode( queryId.genPlanNodeId(), @@ -789,12 +765,16 @@ public class TableDistributedPlanGenerator node.getTimePredicate().orElse(null), node.getScanOrder(), node.isPushLimitToEachDevice(), - node.getTsFilePaths(), - entries, - splitDeviceOffsets.get(i)); + node.getTagAndAttributeIndexMap(), + node.getExternalTsFileQueryResource(), + partition.getDeviceEntryIndexes(), + partition.getPartitionIndex(), + node.getSchemaFilter()); splitNode.setRegionReplicaSet(localRegionReplicaSet); result.add(splitNode); } + sortPropertyContext.ifPresent( + propertyContext -> applySortProperty(node, result, propertyContext, false)); return result; } @@ -805,53 +785,28 @@ public class TableDistributedPlanGenerator new TRegionReplicaSet(null, ImmutableList.of(DataNodeEndPoints.getLocalDataNodeLocation())); node.setRegionReplicaSet(localRegionReplicaSet); context.mostUsedRegion = node.getRegionReplicaSet(); - List<PlanNode> resultNodes = - splitExternalTsFileAggregationScanByDeviceEntries(node, localRegionReplicaSet); - if (context.hasSortProperty) { - processSortProperty(node, resultNodes, context); - } - return resultNodes; - } - - private List<PlanNode> splitExternalTsFileAggregationScanByDeviceEntries( - final ExternalTsFileAggregationScanNode node, final TRegionReplicaSet localRegionReplicaSet) { - List<DeviceEntry> deviceEntries = node.getDeviceEntries(); - if (deviceEntries.size() <= 1) { - return Collections.singletonList(node); - } - - int splitCount = - Math.min( - deviceEntries.size(), - IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism()); - if (splitCount <= 1) { + Optional<SortPropertyContext> sortPropertyContext = + context.hasSortProperty ? analyzeSortProperty(node, context) : Optional.empty(); + int partitionCount = IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism(); + node.getExternalTsFileQueryResource() + .collectDeviceEntries( + node.getSchemaFilter(), + sortPropertyContext.map(propertyContext -> propertyContext.comparator).orElse(null), + partitionCount); + + List<DeviceTaskPartition> partitions = + node.getExternalTsFileQueryResource().getDeviceTaskPartitions(); + if (partitions.isEmpty()) { return Collections.singletonList(node); } - - List<List<DeviceEntry>> splitDeviceEntries = new ArrayList<>(splitCount); - List<List<List<ExternalTsFileDeviceOffset>>> splitDeviceOffsets = new ArrayList<>(splitCount); - for (int i = 0; i < splitCount; i++) { - splitDeviceEntries.add(new ArrayList<>()); - splitDeviceOffsets.add(new ArrayList<>()); - } - for (int i = 0; i < deviceEntries.size(); i++) { - splitDeviceEntries.get(i % splitCount).add(deviceEntries.get(i)); - splitDeviceOffsets.get(i % splitCount).add(node.getDeviceOffsets().get(i)); - } - - List<PlanNode> result = new ArrayList<>(splitCount); - for (int i = 0; i < splitDeviceEntries.size(); i++) { - List<DeviceEntry> entries = splitDeviceEntries.get(i); - if (entries.isEmpty()) { - continue; - } + List<PlanNode> result = new ArrayList<>(partitions.size()); + for (DeviceTaskPartition partition : partitions) { ExternalTsFileAggregationScanNode splitNode = new ExternalTsFileAggregationScanNode( queryId.genPlanNodeId(), node.getQualifiedObjectName(), node.getOutputSymbols(), node.getAssignments(), - entries, node.getTagAndAttributeIndexMap(), node.getScanOrder(), node.getTimePredicate().orElse(null), @@ -866,11 +821,15 @@ public class TableDistributedPlanGenerator node.getPreGroupedSymbols(), node.getStep(), node.getGroupIdSymbol(), - node.getTsFilePaths(), - splitDeviceOffsets.get(i)); + node.getExternalTsFileQueryResource(), + partition.getDeviceEntryIndexes(), + partition.getPartitionIndex(), + node.getSchemaFilter()); splitNode.setRegionReplicaSet(localRegionReplicaSet); result.add(splitNode); } + sortPropertyContext.ifPresent( + propertyContext -> applySortProperty(node, result, propertyContext, false)); return result; } @@ -1949,18 +1908,26 @@ public class TableDistributedPlanGenerator final DeviceTableScanNode deviceTableScanNode, final List<PlanNode> resultTableScanNodeList, final PlanContext context) { + Optional<SortPropertyContext> sortPropertyContext = + analyzeSortProperty(deviceTableScanNode, context); + if (!sortPropertyContext.isPresent()) { + return; + } + applySortProperty( + deviceTableScanNode, resultTableScanNodeList, sortPropertyContext.get(), true); + } + + private Optional<SortPropertyContext> analyzeSortProperty( + DeviceTableScanNode deviceTableScanNode, PlanContext context) { final List<Symbol> newOrderingSymbols = new ArrayList<>(); final List<SortOrder> newSortOrders = new ArrayList<>(); final OrderingScheme expectedOrderingScheme = context.expectedOrderingScheme; boolean lastIsTimeRelated = false; + boolean scanOrderDesc = false; for (final Symbol symbol : expectedOrderingScheme.getOrderBy()) { if (timeRelatedSymbol(symbol, deviceTableScanNode)) { - if (!expectedOrderingScheme.getOrderings().get(symbol).isAscending()) { - // TODO(beyyes) move scan order judgement into logical plan optimizer - resultTableScanNodeList.forEach( - node -> ((DeviceTableScanNode) node).setScanOrder(Ordering.DESC)); - } + scanOrderDesc = !expectedOrderingScheme.getOrderings().get(symbol).isAscending(); newOrderingSymbols.add(symbol); newSortOrders.add(expectedOrderingScheme.getOrdering(symbol)); lastIsTimeRelated = true; @@ -1975,9 +1942,22 @@ public class TableDistributedPlanGenerator // no sort property can be pushed down into DeviceTableScanNode if (newOrderingSymbols.isEmpty()) { - return; + return Optional.empty(); } + return Optional.of( + new SortPropertyContext( + newOrderingSymbols, + newSortOrders, + createDeviceEntryComparator(deviceTableScanNode, newOrderingSymbols, newSortOrders), + lastIsTimeRelated, + scanOrderDesc)); + } + + private Comparator<DeviceEntry> createDeviceEntryComparator( + DeviceTableScanNode deviceTableScanNode, + List<Symbol> newOrderingSymbols, + List<SortOrder> newSortOrders) { Optional<IDeviceID.TreeDeviceIdColumnValueExtractor> extractor = createTreeDeviceIdColumnValueExtractor(deviceTableScanNode); final List<Function<DeviceEntry, String>> orderingRules = new ArrayList<>(); @@ -2052,21 +2032,34 @@ public class TableDistributedPlanGenerator comparator = comparator.thenComparing(thenComparator); } } + return comparator; + } + private void applySortProperty( + final DeviceTableScanNode deviceTableScanNode, + final List<PlanNode> resultTableScanNodeList, + final SortPropertyContext sortPropertyContext, + final boolean sortDeviceEntries) { final Optional<OrderingScheme> newOrderingScheme = tableScanOrderingSchema( analysis.getTableColumnSchema(deviceTableScanNode.getQualifiedObjectName()), deviceTableScanNode.getAssignments(), - newOrderingSymbols, - newSortOrders, - lastIsTimeRelated, - deviceTableScanNode.getDeviceEntries().size() == 1); + sortPropertyContext.orderingSymbols, + sortPropertyContext.sortOrders, + sortPropertyContext.lastIsTimeRelated, + resultTableScanNodeList.size() == 1 + && ((DeviceTableScanNode) resultTableScanNodeList.get(0)).getDeviceEntries().size() + == 1); for (final PlanNode planNode : resultTableScanNodeList) { final DeviceTableScanNode scanNode = (DeviceTableScanNode) planNode; + if (sortPropertyContext.scanOrderDesc) { + // TODO(beyyes) move scan order judgement into logical plan optimizer + scanNode.setScanOrder(Ordering.DESC); + } newOrderingScheme.ifPresent( orderingScheme -> nodeOrderingMap.put(scanNode.getPlanNodeId(), orderingScheme)); - if (comparator != null) { - scanNode.sortDeviceEntries(comparator); + if (sortDeviceEntries && sortPropertyContext.comparator != null) { + scanNode.getDeviceEntries().sort(sortPropertyContext.comparator); } } } @@ -2386,6 +2379,27 @@ public class TableDistributedPlanGenerator node.getOutputSymbols())); } + private static class SortPropertyContext { + final List<Symbol> orderingSymbols; + final List<SortOrder> sortOrders; + final Comparator<DeviceEntry> comparator; + final boolean lastIsTimeRelated; + final boolean scanOrderDesc; + + private SortPropertyContext( + List<Symbol> orderingSymbols, + List<SortOrder> sortOrders, + Comparator<DeviceEntry> comparator, + boolean lastIsTimeRelated, + boolean scanOrderDesc) { + this.orderingSymbols = orderingSymbols; + this.sortOrders = sortOrders; + this.comparator = comparator; + this.lastIsTimeRelated = lastIsTimeRelated; + this.scanOrderDesc = scanOrderDesc; + } + } + public static class PlanContext { final Map<PlanNodeId, NodeDistribution> nodeDistributionMap; boolean hasExchangeNode = false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java index a8a36c6328e..95a833074c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java @@ -103,9 +103,12 @@ public class PruneTableScanColumns extends ProjectOffPushDownRule<TableScanNode> externalTsFileScanNode.getPushDownOffset(), externalTsFileScanNode.getTimePredicate().orElse(null), externalTsFileScanNode.getScanOrder(), - externalTsFileScanNode.getTsFilePaths(), - externalTsFileScanNode.getDeviceEntries(), - externalTsFileScanNode.getDeviceOffsets()); + externalTsFileScanNode.isPushLimitToEachDevice(), + externalTsFileScanNode.getTagAndAttributeIndexMap(), + externalTsFileScanNode.getExternalTsFileQueryResource(), + externalTsFileScanNode.getDeviceEntryIndexes(), + externalTsFileScanNode.getDeviceTaskPartitionIndex(), + externalTsFileScanNode.getSchemaFilter()); prunedNode.setRegionReplicaSet(externalTsFileScanNode.getRegionReplicaSet()); return Optional.of(prunedNode); } else if (node instanceof DeviceTableScanNode) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java index 19057bc7146..9402e837575 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java @@ -297,28 +297,31 @@ public class AggregationTableScanNode extends DeviceTableScanNode { DeviceTableScanNode tableScanNode) { if (tableScanNode instanceof ExternalTsFileScanNode) { ExternalTsFileScanNode externalTsFileScanNode = (ExternalTsFileScanNode) tableScanNode; - return new ExternalTsFileAggregationScanNode( - id, - tableScanNode.getQualifiedObjectName(), - tableScanNode.getOutputSymbols(), - tableScanNode.getAssignments(), - tableScanNode.getDeviceEntries(), - tableScanNode.getTagAndAttributeIndexMap(), - tableScanNode.getScanOrder(), - tableScanNode.getTimePredicate().orElse(null), - tableScanNode.getPushDownPredicate(), - tableScanNode.getPushDownLimit(), - tableScanNode.getPushDownOffset(), - tableScanNode.isPushLimitToEachDevice(), - tableScanNode.containsNonAlignedDevice(), - projectNode == null ? null : projectNode.getAssignments(), - aggregationNode.getAggregations(), - aggregationNode.getGroupingSets(), - aggregationNode.getPreGroupedSymbols(), - aggregationNode.getStep(), - aggregationNode.getGroupIdSymbol(), - externalTsFileScanNode.getTsFilePaths(), - externalTsFileScanNode.getDeviceOffsets()); + ExternalTsFileAggregationScanNode scanNode = + new ExternalTsFileAggregationScanNode( + id, + tableScanNode.getQualifiedObjectName(), + tableScanNode.getOutputSymbols(), + tableScanNode.getAssignments(), + tableScanNode.getTagAndAttributeIndexMap(), + tableScanNode.getScanOrder(), + tableScanNode.getTimePredicate().orElse(null), + tableScanNode.getPushDownPredicate(), + tableScanNode.getPushDownLimit(), + tableScanNode.getPushDownOffset(), + tableScanNode.isPushLimitToEachDevice(), + tableScanNode.containsNonAlignedDevice(), + projectNode == null ? null : projectNode.getAssignments(), + aggregationNode.getAggregations(), + aggregationNode.getGroupingSets(), + aggregationNode.getPreGroupedSymbols(), + aggregationNode.getStep(), + aggregationNode.getGroupIdSymbol(), + externalTsFileScanNode.getExternalTsFileQueryResource(), + externalTsFileScanNode.getDeviceEntryIndexes(), + externalTsFileScanNode.getDeviceTaskPartitionIndex(), + externalTsFileScanNode.getSchemaFilter()); + return scanNode; } if (tableScanNode instanceof TreeDeviceViewScanNode) { TreeDeviceViewScanNode treeDeviceViewScanNode = (TreeDeviceViewScanNode) tableScanNode; @@ -381,7 +384,6 @@ public class AggregationTableScanNode extends DeviceTableScanNode { tableScanNode.getQualifiedObjectName(), tableScanNode.getOutputSymbols(), tableScanNode.getAssignments(), - tableScanNode.getDeviceEntries(), tableScanNode.getTagAndAttributeIndexMap(), tableScanNode.getScanOrder(), tableScanNode.getTimePredicate().orElse(null), @@ -396,8 +398,10 @@ public class AggregationTableScanNode extends DeviceTableScanNode { aggregationNode.getPreGroupedSymbols(), step, aggregationNode.getGroupIdSymbol(), - externalTsFileScanNode.getTsFilePaths(), - externalTsFileScanNode.getDeviceOffsets()); + externalTsFileScanNode.getExternalTsFileQueryResource(), + externalTsFileScanNode.getDeviceEntryIndexes(), + externalTsFileScanNode.getDeviceTaskPartitionIndex(), + externalTsFileScanNode.getSchemaFilter()); } if (tableScanNode instanceof TreeDeviceViewScanNode) { TreeDeviceViewScanNode treeDeviceViewScanNode = (TreeDeviceViewScanNode) tableScanNode; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/DeviceTableScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/DeviceTableScanNode.java index e4fceb9818b..c02c74058af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/DeviceTableScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/DeviceTableScanNode.java @@ -41,7 +41,6 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -276,10 +275,6 @@ public class DeviceTableScanNode extends TableScanNode { this.deviceEntries.add(deviceEntry); } - public void sortDeviceEntries(Comparator<DeviceEntry> comparator) { - this.deviceEntries.sort(comparator); - } - public void setPushLimitToEachDevice(boolean pushLimitToEachDevice) { this.pushLimitToEachDevice = pushLimitToEachDevice; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileAggregationScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileAggregationScanNode.java index 04434ad498e..dd338dc2d90 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileAggregationScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileAggregationScanNode.java @@ -27,32 +27,32 @@ import org.apache.iotdb.commons.queryengine.plan.relational.planner.Assignments; import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression; -import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset; +import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import com.google.common.collect.Lists; + import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.IntStream; public class ExternalTsFileAggregationScanNode extends AggregationTableScanNode { - private List<String> tsFilePaths; - private List<List<ExternalTsFileDeviceOffset>> deviceOffsets = Collections.emptyList(); + private final ExternalTsFileQueryResource externalTsFileQueryResource; + private List<Integer> deviceEntryIndexes; + private int deviceTaskPartitionIndex = -1; + private SchemaFilter schemaFilter; public ExternalTsFileAggregationScanNode( PlanNodeId id, QualifiedObjectName qualifiedObjectName, List<Symbol> outputSymbols, Map<Symbol, ColumnSchema> assignments, - List<DeviceEntry> deviceEntries, Map<Symbol, Integer> tagAndAttributeIndexMap, Ordering scanOrder, Expression timePredicate, @@ -67,14 +67,16 @@ public class ExternalTsFileAggregationScanNode extends AggregationTableScanNode List<Symbol> preGroupedSymbols, AggregationNode.Step step, Optional<Symbol> groupIdSymbol, - List<String> tsFilePaths, - List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { + ExternalTsFileQueryResource externalTsFileQueryResource, + List<Integer> deviceEntryIndexes, + int deviceTaskPartitionIndex, + SchemaFilter schemaFilter) { super( id, qualifiedObjectName, outputSymbols, assignments, - deviceEntries, + Lists.transform(deviceEntryIndexes, externalTsFileQueryResource.getDeviceEntries()::get), tagAndAttributeIndexMap, scanOrder, timePredicate, @@ -89,12 +91,12 @@ public class ExternalTsFileAggregationScanNode extends AggregationTableScanNode preGroupedSymbols, step, groupIdSymbol); - this.tsFilePaths = Collections.unmodifiableList(new ArrayList<>(tsFilePaths)); - this.deviceOffsets = copyDeviceOffsets(deviceOffsets); + this.externalTsFileQueryResource = externalTsFileQueryResource; + this.deviceEntryIndexes = deviceEntryIndexes; + this.deviceTaskPartitionIndex = deviceTaskPartitionIndex; + this.schemaFilter = schemaFilter; } - protected ExternalTsFileAggregationScanNode() {} - @Override public <R, C> R accept(IPlanVisitor<R, C> visitor, C context) { return ((PlanVisitor<R, C>) visitor).visitExternalTsFileAggregationScan(this, context); @@ -107,7 +109,6 @@ public class ExternalTsFileAggregationScanNode extends AggregationTableScanNode qualifiedObjectName, outputSymbols, assignments, - deviceEntries, tagAndAttributeIndexMap, scanOrder, timePredicate, @@ -122,47 +123,40 @@ public class ExternalTsFileAggregationScanNode extends AggregationTableScanNode preGroupedSymbols, step, groupIdSymbol, - tsFilePaths, - deviceOffsets); + externalTsFileQueryResource, + deviceEntryIndexes, + deviceTaskPartitionIndex, + schemaFilter); } public List<String> getTsFilePaths() { - return tsFilePaths; + return externalTsFileQueryResource.getTsFilePaths(); + } + + public ExternalTsFileQueryResource getExternalTsFileQueryResource() { + return externalTsFileQueryResource; + } + + public List<Integer> getDeviceEntryIndexes() { + return deviceEntryIndexes; } - public List<List<ExternalTsFileDeviceOffset>> getDeviceOffsets() { - return deviceOffsets; + public int getDeviceTaskPartitionIndex() { + return deviceTaskPartitionIndex; } @Override - public void sortDeviceEntries(Comparator<DeviceEntry> comparator) { - int[] indexes = - IntStream.range(0, deviceEntries.size()) - .boxed() - .sorted( - (left, right) -> - comparator.compare(deviceEntries.get(left), deviceEntries.get(right))) - .mapToInt(Integer::intValue) - .toArray(); - List<DeviceEntry> sortedDeviceEntries = new ArrayList<>(deviceEntries.size()); - List<List<ExternalTsFileDeviceOffset>> sortedDeviceOffsets = - new ArrayList<>(deviceOffsets.size()); - for (int index : indexes) { - sortedDeviceEntries.add(deviceEntries.get(index)); - sortedDeviceOffsets.add(deviceOffsets.get(index)); - } - this.deviceEntries = sortedDeviceEntries; - this.deviceOffsets = sortedDeviceOffsets; + public void setDeviceEntries(List<DeviceEntry> deviceEntries) { + throw new UnsupportedOperationException( + "ExternalTsFileAggregationScanNode device entries must be set by device entry indexes"); + } + + public SchemaFilter getSchemaFilter() { + return schemaFilter; } - private static List<List<ExternalTsFileDeviceOffset>> copyDeviceOffsets( - List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { - List<List<ExternalTsFileDeviceOffset>> copiedDeviceOffsets = - new ArrayList<>(deviceOffsets.size()); - for (List<ExternalTsFileDeviceOffset> offsets : deviceOffsets) { - copiedDeviceOffsets.add(new ArrayList<>(offsets)); - } - return copiedDeviceOffsets; + public void setSchemaFilter(SchemaFilter schemaFilter) { + this.schemaFilter = schemaFilter; } @Override @@ -177,11 +171,6 @@ public class ExternalTsFileAggregationScanNode extends AggregationTableScanNode "ExternalTsFileAggregationScanNode cannot be serialized because it reads local external TsFiles"); } - public static ExternalTsFileAggregationScanNode deserialize(ByteBuffer byteBuffer) { - throw new UnsupportedOperationException( - "ExternalTsFileAggregationScanNode cannot be deserialized because it reads local external TsFiles"); - } - @Override public String toString() { return "ExternalTsFileAggregationScanNode-" + this.getPlanNodeId(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java index f21a7a755c0..be83256c404 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java @@ -25,26 +25,26 @@ import org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchem import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression; -import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset; +import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import com.google.common.collect.Lists; + import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.stream.IntStream; - -import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TAG; public class ExternalTsFileScanNode extends DeviceTableScanNode { - private List<String> tsFilePaths; - private List<List<ExternalTsFileDeviceOffset>> deviceOffsets = Collections.emptyList(); + private ExternalTsFileQueryResource externalTsFileQueryResource; + private List<Integer> deviceEntryIndexes = Collections.emptyList(); + private int deviceTaskPartitionIndex = -1; + private SchemaFilter schemaFilter; protected ExternalTsFileScanNode() {} @@ -53,133 +53,10 @@ public class ExternalTsFileScanNode extends DeviceTableScanNode { QualifiedObjectName qualifiedObjectName, List<Symbol> outputSymbols, Map<Symbol, ColumnSchema> assignments, - List<String> tsFilePaths) { - this( - id, - qualifiedObjectName, - outputSymbols, - assignments, - tsFilePaths, - Collections.emptyList(), - Collections.emptyList()); - } - - public ExternalTsFileScanNode( - PlanNodeId id, - QualifiedObjectName qualifiedObjectName, - List<Symbol> outputSymbols, - Map<Symbol, ColumnSchema> assignments, - List<String> tsFilePaths, - List<DeviceEntry> deviceEntries, - List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { - super(id, qualifiedObjectName, outputSymbols, assignments, buildTagIndexMap(assignments)); - this.tsFilePaths = Collections.unmodifiableList(new ArrayList<>(tsFilePaths)); - this.deviceEntries = new ArrayList<>(deviceEntries); - this.deviceOffsets = copyDeviceOffsets(deviceOffsets); - } - - public ExternalTsFileScanNode( - PlanNodeId id, - QualifiedObjectName qualifiedObjectName, - List<Symbol> outputSymbols, - Map<Symbol, ColumnSchema> assignments, - Expression pushDownPredicate, - long pushDownLimit, - long pushDownOffset, - Ordering scanOrder, - List<String> tsFilePaths) { - this( - id, - qualifiedObjectName, - outputSymbols, - assignments, - pushDownPredicate, - pushDownLimit, - pushDownOffset, - scanOrder, - tsFilePaths, - Collections.emptyList()); - } - - public ExternalTsFileScanNode( - PlanNodeId id, - QualifiedObjectName qualifiedObjectName, - List<Symbol> outputSymbols, - Map<Symbol, ColumnSchema> assignments, - Expression pushDownPredicate, - long pushDownLimit, - long pushDownOffset, - Ordering scanOrder, - List<String> tsFilePaths, - List<DeviceEntry> deviceEntries) { - this( - id, - qualifiedObjectName, - outputSymbols, - assignments, - pushDownPredicate, - pushDownLimit, - pushDownOffset, - null, - scanOrder, - tsFilePaths, - deviceEntries, - Collections.emptyList()); - } - - public ExternalTsFileScanNode( - PlanNodeId id, - QualifiedObjectName qualifiedObjectName, - List<Symbol> outputSymbols, - Map<Symbol, ColumnSchema> assignments, - Expression pushDownPredicate, - long pushDownLimit, - long pushDownOffset, - Expression timePredicate, - Ordering scanOrder, - List<String> tsFilePaths) { - this( - id, - qualifiedObjectName, - outputSymbols, - assignments, - pushDownPredicate, - pushDownLimit, - pushDownOffset, - timePredicate, - scanOrder, - tsFilePaths, - Collections.emptyList(), - Collections.emptyList()); - } - - public ExternalTsFileScanNode( - PlanNodeId id, - QualifiedObjectName qualifiedObjectName, - List<Symbol> outputSymbols, - Map<Symbol, ColumnSchema> assignments, - Expression pushDownPredicate, - long pushDownLimit, - long pushDownOffset, - Expression timePredicate, - Ordering scanOrder, - List<String> tsFilePaths, - List<DeviceEntry> deviceEntries, - List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { - this( - id, - qualifiedObjectName, - outputSymbols, - assignments, - pushDownPredicate, - pushDownLimit, - pushDownOffset, - timePredicate, - scanOrder, - false, - tsFilePaths, - deviceEntries, - deviceOffsets); + Map<Symbol, Integer> tagAndAttributeIndexMap, + ExternalTsFileQueryResource externalTsFileQueryResource) { + super(id, qualifiedObjectName, outputSymbols, assignments, tagAndAttributeIndexMap); + this.externalTsFileQueryResource = externalTsFileQueryResource; } public ExternalTsFileScanNode( @@ -193,16 +70,18 @@ public class ExternalTsFileScanNode extends DeviceTableScanNode { Expression timePredicate, Ordering scanOrder, boolean pushLimitToEachDevice, - List<String> tsFilePaths, - List<DeviceEntry> deviceEntries, - List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { + Map<Symbol, Integer> tagAndAttributeIndexMap, + ExternalTsFileQueryResource externalTsFileQueryResource, + List<Integer> deviceEntryIndexes, + int deviceTaskPartitionIndex, + SchemaFilter schemaFilter) { super( id, qualifiedObjectName, outputSymbols, assignments, - new ArrayList<>(deviceEntries), - buildTagIndexMap(assignments), + Lists.transform(deviceEntryIndexes, externalTsFileQueryResource.getDeviceEntries()::get), + tagAndAttributeIndexMap, scanOrder, timePredicate, pushDownPredicate, @@ -210,8 +89,10 @@ public class ExternalTsFileScanNode extends DeviceTableScanNode { pushDownOffset, pushLimitToEachDevice, false); - this.tsFilePaths = Collections.unmodifiableList(new ArrayList<>(tsFilePaths)); - this.deviceOffsets = copyDeviceOffsets(deviceOffsets); + this.externalTsFileQueryResource = externalTsFileQueryResource; + this.deviceEntryIndexes = deviceEntryIndexes; + this.deviceTaskPartitionIndex = deviceTaskPartitionIndex; + this.schemaFilter = schemaFilter; } @Override @@ -232,52 +113,41 @@ public class ExternalTsFileScanNode extends DeviceTableScanNode { timePredicate, scanOrder, pushLimitToEachDevice, - tsFilePaths, - deviceEntries, - deviceOffsets); + tagAndAttributeIndexMap, + externalTsFileQueryResource, + deviceEntryIndexes, + deviceTaskPartitionIndex, + schemaFilter); } public List<String> getTsFilePaths() { - return tsFilePaths; + return externalTsFileQueryResource.getTsFilePaths(); + } + + public ExternalTsFileQueryResource getExternalTsFileQueryResource() { + return externalTsFileQueryResource; } - public List<List<ExternalTsFileDeviceOffset>> getDeviceOffsets() { - return deviceOffsets; + public List<Integer> getDeviceEntryIndexes() { + return deviceEntryIndexes; } - public void setDeviceOffsets(List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { - this.deviceOffsets = copyDeviceOffsets(deviceOffsets); + public int getDeviceTaskPartitionIndex() { + return deviceTaskPartitionIndex; } @Override - public void sortDeviceEntries(Comparator<DeviceEntry> comparator) { - int[] indexes = - IntStream.range(0, deviceEntries.size()) - .boxed() - .sorted( - (left, right) -> - comparator.compare(deviceEntries.get(left), deviceEntries.get(right))) - .mapToInt(Integer::intValue) - .toArray(); - List<DeviceEntry> sortedDeviceEntries = new ArrayList<>(deviceEntries.size()); - List<List<ExternalTsFileDeviceOffset>> sortedDeviceOffsets = - new ArrayList<>(deviceOffsets.size()); - for (int index : indexes) { - sortedDeviceEntries.add(deviceEntries.get(index)); - sortedDeviceOffsets.add(deviceOffsets.get(index)); - } - this.deviceEntries = sortedDeviceEntries; - this.deviceOffsets = sortedDeviceOffsets; + public void setDeviceEntries(List<DeviceEntry> deviceEntries) { + throw new UnsupportedOperationException( + "ExternalTsFileScanNode device entries must be set by device entry indexes"); + } + + public SchemaFilter getSchemaFilter() { + return schemaFilter; } - private static List<List<ExternalTsFileDeviceOffset>> copyDeviceOffsets( - List<List<ExternalTsFileDeviceOffset>> deviceOffsets) { - List<List<ExternalTsFileDeviceOffset>> copiedDeviceOffsets = - new ArrayList<>(deviceOffsets.size()); - for (List<ExternalTsFileDeviceOffset> offsets : deviceOffsets) { - copiedDeviceOffsets.add(new ArrayList<>(offsets)); - } - return copiedDeviceOffsets; + public void setSchemaFilter(SchemaFilter schemaFilter) { + this.schemaFilter = schemaFilter; } @Override @@ -292,24 +162,8 @@ public class ExternalTsFileScanNode extends DeviceTableScanNode { "ExternalTsFileScanNode cannot be serialized because it reads local external TsFiles"); } - public static ExternalTsFileScanNode deserialize(ByteBuffer byteBuffer) { - throw new UnsupportedOperationException( - "ExternalTsFileScanNode cannot be deserialized because it reads local external TsFiles"); - } - @Override public String toString() { return "ExternalTsFileScanNode-" + this.getPlanNodeId(); } - - private static Map<Symbol, Integer> buildTagIndexMap(Map<Symbol, ColumnSchema> assignments) { - Map<Symbol, Integer> tagIndexMap = new java.util.HashMap<>(); - int tagIndex = 0; - for (Map.Entry<Symbol, ColumnSchema> entry : assignments.entrySet()) { - if (TAG.equals(entry.getValue().getColumnCategory())) { - tagIndexMap.put(entry.getKey(), tagIndex++); - } - } - return tagIndexMap; - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java index af6ddc5db1c..5ffac0473ba 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java @@ -53,12 +53,10 @@ import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.commons.schema.table.InformationSchema; import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.TagColumnSchema; -import org.apache.iotdb.commons.udf.builtin.relational.tvf.ReadTsFileTableFunction.ExternalTsFileDeviceOffset; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; -import org.apache.iotdb.db.queryengine.execution.operator.source.relational.ExternalTsFileDeviceFilterVisitor; import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; @@ -68,7 +66,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.Conver import org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.PredicateCombineIntoTableScanChecker; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.PredicatePushIntoMetadataChecker; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.schema.ConvertSchemaPredicateToFilterVisitor; -import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.metadata.NonAlignedDeviceEntry; @@ -87,24 +84,18 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceVi import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import org.apache.tsfile.file.metadata.IDeviceID; -import org.apache.tsfile.read.LazyTsFileDeviceIterator; -import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.common.type.Type; import org.apache.tsfile.read.filter.basic.Filter; -import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; import javax.annotation.Nullable; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -560,43 +551,8 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { private void collectExternalTsFileDeviceTasks( ExternalTsFileScanNode tableScanNode, List<Expression> metadataExpressions) { - SchemaFilter deviceFilter = - constructExternalTsFileDeviceFilter(tableScanNode, metadataExpressions); - ExternalTsFileDeviceFilterVisitor deviceFilterVisitor = - new ExternalTsFileDeviceFilterVisitor(); - Map<IDeviceID, List<ExternalTsFileDeviceOffset>> deviceOffsetMap = new LinkedHashMap<>(); - for (String tsFilePath : tableScanNode.getTsFilePaths()) { - try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFilePath)) { - LazyTsFileDeviceIterator deviceIterator = - new LazyTsFileDeviceIterator( - reader, tableScanNode.getQualifiedObjectName().getObjectName(), ignored -> {}); - while (deviceIterator.hasNext()) { - IDeviceID deviceID = deviceIterator.next(); - if (deviceFilter != null - && !Boolean.TRUE.equals(deviceFilter.accept(deviceFilterVisitor, deviceID))) { - continue; - } - deviceOffsetMap - .computeIfAbsent(deviceID, ignored -> new ArrayList<>()) - .add( - new ExternalTsFileDeviceOffset( - tsFilePath, deviceIterator.getCurrentDeviceMeasurementNodeOffset())); - } - } catch (IOException e) { - throw new SemanticException( - "Failed to collect devices from external TsFile: " + tsFilePath); - } - } - List<DeviceEntry> deviceEntries = new ArrayList<>(deviceOffsetMap.size()); - List<List<ExternalTsFileDeviceOffset>> deviceOffsets = - new ArrayList<>(deviceOffsetMap.size()); - for (Map.Entry<IDeviceID, List<ExternalTsFileDeviceOffset>> entry : - deviceOffsetMap.entrySet()) { - deviceEntries.add(new AlignedDeviceEntry(entry.getKey(), new Binary[0])); - deviceOffsets.add(entry.getValue()); - } - tableScanNode.setDeviceEntries(deviceEntries); - tableScanNode.setDeviceOffsets(deviceOffsets); + tableScanNode.setSchemaFilter( + constructExternalTsFileDeviceFilter(tableScanNode, metadataExpressions)); } private SchemaFilter constructExternalTsFileDeviceFilter( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index b72b05fbff5..d11a012a7d5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@ -298,9 +298,12 @@ public class UnaliasSymbolReferences implements PlanOptimizer { node.getPushDownOffset(), node.getTimePredicate().map(mapper::map).orElse(null), node.getScanOrder(), - node.getTsFilePaths(), - node.getDeviceEntries(), - node.getDeviceOffsets()); + node.isPushLimitToEachDevice(), + node.getTagAndAttributeIndexMap(), + node.getExternalTsFileQueryResource(), + node.getDeviceEntryIndexes(), + node.getDeviceTaskPartitionIndex(), + node.getSchemaFilter()); rewrittenNode.setRegionReplicaSet(node.getRegionReplicaSet()); return new PlanAndMappings(rewrittenNode, mapping); }
