This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch table_disk_usage_statistics in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1699e442cd5e9dd32c632a7f98f999e128ac5c77 Author: shuwenwei <[email protected]> AuthorDate: Thu Nov 13 11:32:06 2025 +0800 remove showDiskUsageStatement from table model --- .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 7 +- .../operator/source/ShowDiskUsageOperator.java | 63 ++++++++--- .../InformationSchemaContentSupplierFactory.java | 60 +++++----- .../queryengine/plan/analyze/AnalyzeVisitor.java | 42 +++++-- .../plan/optimization/LimitOffsetPushDown.java | 32 +++++- .../plan/optimization/PredicatePushDown.java | 56 +++++++++- .../db/queryengine/plan/parser/ASTVisitor.java | 32 ++++-- .../plan/planner/LogicalPlanBuilder.java | 46 +++++++- .../plan/planner/LogicalPlanVisitor.java | 6 +- .../plan/planner/OperatorTreeGenerator.java | 33 +++++- .../plan/planner/TableOperatorGenerator.java | 1 + .../planner/distribution/ExchangeNodeAdder.java | 12 +- .../plan/planner/distribution/SourceRewriter.java | 13 +++ .../plan/planner/plan/node/PlanGraphPrinter.java | 9 ++ .../plan/planner/plan/node/PlanNodeType.java | 4 + .../plan/planner/plan/node/PlanVisitor.java | 5 + .../planner/plan/node/process/TreeCollectNode.java | 121 +++++++++++++++++++++ .../plan/node/source/ShowDiskUsageNode.java | 85 ++++++++++++++- .../planner/plan/node/source/ShowQueriesNode.java | 2 +- .../plan/relational/analyzer/Analyzer.java | 3 +- .../plan/relational/planner/RelationPlanner.java | 4 - .../plan/relational/sql/ast/AstVisitor.java | 4 - .../relational/sql/ast/ShowDiskUsageOfTable.java | 40 ------- .../plan/relational/sql/parser/AstBuilder.java | 33 ------ .../plan/relational/sql/rewrite/ShowRewrite.java | 53 ++------- .../relational/sql/rewrite/StatementRewrite.java | 15 +-- .../plan/statement/sys/ShowDiskUsageStatement.java | 22 +++- .../execute/utils/MultiTsFileDeviceIterator.java | 3 +- .../compaction/repair/RepairDataFileScanUtil.java | 2 +- .../dataregion/utils/DiskUsageStatisticUtil.java | 62 +++++++++-- .../utils/TableDiskUsageStatisticUtil.java | 51 +++++---- .../utils/TreeDiskUsageStatisticUtil.java | 69 ++++++------ .../iotdb/db/utils/datastructure/TVList.java | 2 +- ...anNodeSerdeTest.java => PlanNodeSerdeTest.java} | 21 +++- .../schema/column/ColumnHeaderConstant.java | 3 + .../commons/schema/table/InformationSchema.java | 4 - .../db/relational/grammar/sql/RelationalSql.g4 | 8 +- 37 files changed, 718 insertions(+), 310 deletions(-) diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index 38b20a38d56..0d7337503bb 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 @@ -959,10 +959,6 @@ sortKey : TIME | TIMESERIES | DEVICE - | QUERYID - | DATANODEID - | ELAPSEDTIME - | STATEMENT ; // ---- Fill Clause @@ -1235,6 +1231,9 @@ showQueries showDiskUsage : SHOW DISK_USAGE FROM prefixPath + whereClause? + orderByClause? + rowPaginationClause? ; // Show Current Timestamp diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowDiskUsageOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowDiskUsageOperator.java index 07272573207..8c51eb8dbb1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowDiskUsageOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowDiskUsageOperator.java @@ -30,9 +30,13 @@ import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.utils.StorageEngineTimePartitionIterator; import org.apache.iotdb.db.storageengine.dataregion.utils.TreeDiskUsageStatisticUtil; +import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.filter.basic.Filter; +import org.apache.tsfile.read.reader.series.PaginationController; +import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.RamUsageEstimator; import java.util.NoSuchElementException; @@ -48,15 +52,22 @@ public class ShowDiskUsageOperator implements SourceOperator { private final PlanNodeId sourceId; private final PartialPath pathPattern; private final StorageEngineTimePartitionIterator timePartitionIterator; + private final PaginationController paginationController; + private final TsBlockBuilder tsBlockBuilder = + new TsBlockBuilder(DatasetHeaderFactory.getShowDiskUsageHeader().getRespDataTypes()); private TreeDiskUsageStatisticUtil statisticUtil; private boolean allConsumed = false; - private long result = 0; public ShowDiskUsageOperator( - OperatorContext operatorContext, PlanNodeId sourceId, PartialPath pathPattern) { + OperatorContext operatorContext, + PlanNodeId sourceId, + PartialPath pathPattern, + Filter pushDownFilter, + PaginationController paginationController) { this.operatorContext = operatorContext; this.sourceId = sourceId; this.pathPattern = pathPattern; + this.paginationController = paginationController; this.timePartitionIterator = new StorageEngineTimePartitionIterator( Optional.of( @@ -65,7 +76,24 @@ public class ShowDiskUsageOperator implements SourceOperator { return !PathUtils.isTableModelDatabase(databaseName) && pathPattern.matchPrefixPath(new PartialPath(databaseName)); }), - Optional.empty()); + Optional.of( + (dataRegion, timePartition) -> { + if (pushDownFilter != null) { + Object[] row = new Object[4]; + row[0] = new Binary(dataRegion.getDatabaseName(), TSFileConfig.STRING_CHARSET); + row[1] = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); + row[2] = Integer.parseInt(dataRegion.getDataRegionId()); + row[3] = timePartition; + if (!pushDownFilter.satisfyRow(0, row)) { + return false; + } + } + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + return false; + } + return paginationController.hasCurLimit(); + })); } @Override @@ -91,15 +119,31 @@ public class ShowDiskUsageOperator implements SourceOperator { continue; } if (statisticUtil != null) { - result += statisticUtil.getResult()[0]; + tsBlockBuilder.getTimeColumnBuilder().writeLong(0); + tsBlockBuilder.getValueColumnBuilders()[0].writeBinary( + new Binary( + timePartitionIterator.currentDataRegion().getDatabaseName(), + TSFileConfig.STRING_CHARSET)); + tsBlockBuilder.getValueColumnBuilders()[1].writeInt( + IoTDBDescriptor.getInstance().getConfig().getDataNodeId()); + tsBlockBuilder.getValueColumnBuilders()[2].writeInt( + Integer.parseInt(timePartitionIterator.currentDataRegion().getDataRegionId())); + tsBlockBuilder.getValueColumnBuilders()[3].writeLong( + timePartitionIterator.currentTimePartition()); + tsBlockBuilder.getValueColumnBuilders()[4].writeLong(statisticUtil.getResult()[0]); + tsBlockBuilder.declarePosition(); + paginationController.consumeLimit(); statisticUtil.close(); } - if (timePartitionIterator.next()) { + if (paginationController.hasCurLimit() && timePartitionIterator.next()) { DataRegion dataRegion = timePartitionIterator.currentDataRegion(); long timePartition = timePartitionIterator.currentTimePartition(); statisticUtil = new TreeDiskUsageStatisticUtil( - dataRegion.getTsFileManager(), timePartition, pathPattern); + dataRegion.getTsFileManager(), + timePartition, + pathPattern, + Optional.ofNullable(operatorContext.getInstanceContext())); } else { allConsumed = true; } @@ -108,13 +152,6 @@ public class ShowDiskUsageOperator implements SourceOperator { if (!allConsumed) { return null; } - TsBlockBuilder tsBlockBuilder = - new TsBlockBuilder(1, DatasetHeaderFactory.getShowDiskUsageHeader().getRespDataTypes()); - tsBlockBuilder.getTimeColumnBuilder().writeLong(0); - tsBlockBuilder.getValueColumnBuilders()[0].writeInt( - IoTDBDescriptor.getInstance().getConfig().getDataNodeId()); - tsBlockBuilder.getValueColumnBuilders()[1].writeLong(result); - tsBlockBuilder.declarePosition(); return tsBlockBuilder.build(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java index d2a5cb21ca3..9c9ebe25ed4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java @@ -138,6 +138,7 @@ public class InformationSchemaContentSupplierFactory { private InformationSchemaContentSupplierFactory() {} public static IInformationSchemaContentSupplier getSupplier( + final OperatorContext context, final String tableName, final List<TSDataType> dataTypes, final UserEntity userEntity, @@ -181,7 +182,7 @@ public class InformationSchemaContentSupplierFactory { return new DataNodesSupplier(dataTypes, userEntity); case InformationSchema.TABLE_DISK_USAGE: return new TableDiskUsageSupplier( - dataTypes, userEntity, pushDownFilter, paginationController); + dataTypes, userEntity, pushDownFilter, paginationController, context); default: throw new UnsupportedOperationException("Unknown table: " + tableName); } @@ -1246,6 +1247,7 @@ public class InformationSchemaContentSupplierFactory { private final Map<String, List<TTableInfo>> databaseTableInfoMap; private final Filter pushDownFilter; private final PaginationController paginationController; + private final OperatorContext operatorContext; private DataRegion currentDataRegion; private long currentTimePartition; @@ -1257,12 +1259,14 @@ public class InformationSchemaContentSupplierFactory { private TableDiskUsageSupplier( final List<TSDataType> dataTypes, final UserEntity userEntity, - Filter pushDownFilter, - PaginationController paginationController) + final Filter pushDownFilter, + final PaginationController paginationController, + final OperatorContext operatorContext) throws TException, ClientManagerException { this.dataTypes = dataTypes; this.pushDownFilter = pushDownFilter; this.paginationController = paginationController; + this.operatorContext = operatorContext; AuthorityChecker.getAccessControl().checkUserGlobalSysPrivilege(userEntity); try (final ConfigNodeClient client = ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { @@ -1288,19 +1292,22 @@ public class InformationSchemaContentSupplierFactory { @Override public boolean hasNext() { - if (!paginationController.hasCurLimit()) { - return false; - } if (statisticUtil != null) { return true; } + if (!paginationController.hasCurLimit()) { + return false; + } try { if (timePartitionIterator.next()) { currentDataRegion = timePartitionIterator.currentDataRegion(); currentTimePartition = timePartitionIterator.currentTimePartition(); statisticUtil = new TableDiskUsageStatisticUtil( - currentDataRegion.getTsFileManager(), currentTimePartition, currentTablesToScan); + currentDataRegion.getTsFileManager(), + currentTimePartition, + currentTablesToScan, + Optional.ofNullable(operatorContext.getInstanceContext())); return true; } return false; @@ -1317,21 +1324,28 @@ public class InformationSchemaContentSupplierFactory { return Collections.emptyList(); } - if (pushDownFilter == null) { - return tTableInfos.stream().map(TTableInfo::getTableName).collect(Collectors.toList()); - } - List<String> tablesToScan = new ArrayList<>(tTableInfos.size()); for (TTableInfo tTableInfo : tTableInfos) { - Object[] row = new Object[5]; - row[0] = new Binary(dataRegion.getDatabaseName(), TSFileConfig.STRING_CHARSET); - row[1] = new Binary(tTableInfo.getTableName(), TSFileConfig.STRING_CHARSET); - row[2] = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); - row[3] = Integer.parseInt(dataRegion.getDataRegionId()); - row[4] = timePartition; - if (pushDownFilter.satisfyRow(0, row)) { - tablesToScan.add(tTableInfo.getTableName()); + if (pushDownFilter != null) { + Object[] row = new Object[5]; + row[0] = new Binary(dataRegion.getDatabaseName(), TSFileConfig.STRING_CHARSET); + row[1] = new Binary(tTableInfo.getTableName(), TSFileConfig.STRING_CHARSET); + row[2] = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); + row[3] = Integer.parseInt(dataRegion.getDataRegionId()); + row[4] = timePartition; + if (!pushDownFilter.satisfyRow(0, row)) { + continue; + } + } + if (!paginationController.hasCurLimit()) { + break; } + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + continue; + } + paginationController.consumeLimit(); + tablesToScan.add(tTableInfo.getTableName()); } return tablesToScan; } @@ -1358,13 +1372,6 @@ public class InformationSchemaContentSupplierFactory { long[] resultArr = statisticUtil.getResult(); for (int i = 0; i < currentTablesToScan.size(); i++) { - if (paginationController.hasCurOffset()) { - paginationController.consumeOffset(); - continue; - } - if (!paginationController.hasCurLimit()) { - break; - } builder.getTimeColumnBuilder().writeLong(0); ColumnBuilder[] columns = builder.getValueColumnBuilders(); @@ -1376,7 +1383,6 @@ public class InformationSchemaContentSupplierFactory { columns[4].writeLong(currentTimePartition); columns[5].writeLong(resultArr[i]); builder.declarePosition(); - paginationController.consumeLimit(); } closeStatisticUtil(); return builder.build(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 47435364c2a..af6c650ddf6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -3780,7 +3780,12 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> analysis.setSourceExpressions(sourceExpressions); sourceExpressions.forEach(expression -> analyzeExpressionType(analysis, expression)); - analyzeWhere(analysis, showQueriesStatement); + if (!analyzeWhere( + analysis, + showQueriesStatement.getWhereCondition(), + ColumnHeaderConstant.showQueriesColumnHeaders)) { + showQueriesStatement.setWhereCondition(null); + } analysis.setMergeOrderParameter(new OrderByParameter(showQueriesStatement.getSortItemList())); @@ -3790,7 +3795,6 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> @Override public Analysis visitShowDiskUsage( ShowDiskUsageStatement showDiskUsageStatement, MPPQueryContext context) { - context.setTimeOut(Long.MAX_VALUE); Analysis analysis = new Analysis(); analysis.setRealStatement(showDiskUsageStatement); analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowDiskUsageHeader()); @@ -3811,27 +3815,47 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> analysis.setSourceExpressions(sourceExpressions); sourceExpressions.forEach(expression -> analyzeExpressionType(analysis, expression)); + if (!analyzeWhere( + analysis, + showDiskUsageStatement.getWhereCondition(), + ColumnHeaderConstant.showDiskUsageColumnHeaders)) { + showDiskUsageStatement.setWhereCondition(null); + } + analysis.setMergeOrderParameter(new OrderByParameter(showDiskUsageStatement.getSortItemList())); return analysis; } - private void analyzeWhere(Analysis analysis, ShowQueriesStatement showQueriesStatement) { - WhereCondition whereCondition = showQueriesStatement.getWhereCondition(); + private boolean analyzeWhere( + Analysis analysis, WhereCondition whereCondition, List<ColumnHeader> statementColumnHeaders) { if (whereCondition == null) { - return; + return true; } - Expression whereExpression = + Expression predicate = ExpressionAnalyzer.bindTypeForTimeSeriesOperand( - whereCondition.getPredicate(), ColumnHeaderConstant.showQueriesColumnHeaders); + whereCondition.getPredicate(), statementColumnHeaders); + Pair<Expression, Boolean> resultPair = + PredicateUtils.extractGlobalTimePredicate(predicate, true, true); + boolean hasValueFilter = resultPair.getRight(); - TSDataType outputType = analyzeExpressionType(analysis, whereExpression); + predicate = PredicateUtils.simplifyPredicate(predicate); + + // set where condition to null if predicate is true or don't have value filter + if (!hasValueFilter || predicate.equals(ConstantOperand.TRUE)) { + return false; + } else { + whereCondition.setPredicate(predicate); + } + TSDataType outputType = analyzeExpressionType(analysis, predicate); if (outputType != TSDataType.BOOLEAN) { throw new SemanticException(String.format(WHERE_WRONG_TYPE_ERROR_MSG, outputType)); } - analysis.setWhereExpression(whereExpression); + analysis.setWhereExpression(predicate); + analysis.setHasValueFilter(true); + return true; } // Region view diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDown.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDown.java index e041bb4457a..43b0b00f98c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDown.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDown.java @@ -33,10 +33,12 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChild import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TreeCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TwoChildProcessNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.LeftOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowDiskUsageNode; import org.apache.iotdb.db.queryengine.plan.statement.StatementType; import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy; import org.apache.iotdb.db.queryengine.plan.statement.component.GroupByTimeComponent; @@ -69,7 +71,11 @@ public class LimitOffsetPushDown implements PlanOptimizer { @Override public PlanNode optimize(PlanNode plan, Analysis analysis, MPPQueryContext context) { - if (analysis.getTreeStatement().getType() != StatementType.QUERY) { + StatementType statementType = analysis.getTreeStatement().getType(); + if (statementType == StatementType.SHOW_DISK_USAGE) { + return plan.accept(new Rewriter(), new RewriterContext(analysis)); + } + if (statementType != StatementType.QUERY) { return plan; } QueryStatement queryStatement = analysis.getQueryStatement(); @@ -174,6 +180,21 @@ public class LimitOffsetPushDown implements PlanOptimizer { return node; } + @Override + public PlanNode visitCollect(TreeCollectNode node, RewriterContext context) { + PlanNode newNode = node.clone(); + RewriterContext subContext = new RewriterContext(context.getAnalysis()); + subContext.setLimit(context.getLimit() + context.getOffset()); + for (PlanNode child : node.getChildren()) { + newNode.addChild(child.accept(this, subContext)); + } + if (node.getChildren().size() > 1) { + // keep parent limit/offset node + context.setEnablePushDown(false); + } + return newNode; + } + @Override public PlanNode visitTwoChildProcess(TwoChildProcessNode node, RewriterContext context) { context.setEnablePushDown(false); @@ -204,6 +225,15 @@ public class LimitOffsetPushDown implements PlanOptimizer { } return node; } + + @Override + public PlanNode visitShowDiskUsage(ShowDiskUsageNode node, RewriterContext context) { + if (context.isEnablePushDown()) { + node.setPushDownLimit(context.getLimit()); + node.setPushDownOffset(context.getOffset()); + } + return node; + } } private static class RewriterContext { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/PredicatePushDown.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/PredicatePushDown.java index ed15f82ea4b..4c91f3339ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/PredicatePushDown.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/PredicatePushDown.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.plan.optimization; import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; @@ -29,6 +30,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer; import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils; import org.apache.iotdb.db.queryengine.plan.analyze.TemplatedInfo; import org.apache.iotdb.db.queryengine.plan.expression.Expression; +import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; @@ -43,6 +45,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.LeftO import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanSourceNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowDiskUsageNode; import org.apache.iotdb.db.queryengine.plan.statement.StatementType; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; @@ -52,7 +55,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -62,7 +67,11 @@ public class PredicatePushDown implements PlanOptimizer { @Override public PlanNode optimize(PlanNode plan, Analysis analysis, MPPQueryContext context) { - if (analysis.getTreeStatement().getType() != StatementType.QUERY) { + StatementType statementType = analysis.getTreeStatement().getType(); + if (statementType == StatementType.SHOW_DISK_USAGE) { + return plan.accept(new Rewriter(), new RewriterContext(analysis, context, false)); + } + if (statementType != StatementType.QUERY) { return plan; } QueryStatement queryStatement = analysis.getQueryStatement(); @@ -329,6 +338,44 @@ public class PredicatePushDown implements PlanOptimizer { node.setPushDownPredicate(PredicateUtils.combineConjuncts(canPushDownConjuncts)); context.setEnablePushDown(true); + if (cannotPushDownConjuncts.isEmpty()) { + // all conjuncts can be push down + return node; + } else { + return planFilter( + node, PredicateUtils.combineConjuncts(cannotPushDownConjuncts), context, true); + } + } + + @Override + public PlanNode visitShowDiskUsage(ShowDiskUsageNode node, RewriterContext context) { + if (context.hasNotInheritedPredicate()) { + return node; + } + Expression inheritedPredicate = context.getInheritedPredicate(); + + List<Expression> conjuncts = PredicateUtils.extractConjuncts(inheritedPredicate); + List<Expression> canPushDownConjuncts = new ArrayList<>(); + List<Expression> cannotPushDownConjuncts = new ArrayList<>(); + for (Expression conjunct : conjuncts) { + + if (PredicateUtils.predicateCanPushDownToSource(conjunct) + && !extractSymbolsFromExpression(conjunct) + .contains(ColumnHeaderConstant.SIZE_IN_BYTES)) { + canPushDownConjuncts.add(conjunct); + } else { + cannotPushDownConjuncts.add(conjunct); + } + } + + if (canPushDownConjuncts.isEmpty()) { + // cannot push down + return node; + } + + node.setPushDownPredicate(PredicateUtils.combineConjuncts(canPushDownConjuncts)); + context.setEnablePushDown(true); + if (cannotPushDownConjuncts.isEmpty()) { // all conjuncts can be push down PlanNode resultNode = planTransform(node, context); @@ -340,6 +387,13 @@ public class PredicatePushDown implements PlanOptimizer { } } + private Set<String> extractSymbolsFromExpression(Expression expression) { + List<Expression> sourceExpressions = ExpressionAnalyzer.searchSourceExpressions(expression); + return sourceExpressions.stream() + .map(e -> ((TimeSeriesOperand) e).getPath().toString()) + .collect(Collectors.toSet()); + } + private PlanNode planTransform(PlanNode resultNode, RewriterContext context) { FilterNode pushDownFilterNode = context.getPushDownFilterNode(); Expression[] outputExpressions = pushDownFilterNode.getOutputExpressions(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index 3909cf9e531..57748e12b1d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -3660,14 +3660,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { // parse ORDER BY if (ctx.orderByClause() != null) { showQueriesStatement.setOrderByComponent( - parseOrderByClause( - ctx.orderByClause(), - ImmutableSet.of( - OrderByKey.TIME, - OrderByKey.QUERYID, - OrderByKey.DATANODEID, - OrderByKey.ELAPSEDTIME, - OrderByKey.STATEMENT))); + parseOrderByClause(ctx.orderByClause(), ImmutableSet.of(OrderByKey.TIME))); } // parse LIMIT & OFFSET @@ -3687,7 +3680,28 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { @Override public Statement visitShowDiskUsage(IoTDBSqlParser.ShowDiskUsageContext ctx) { PartialPath pathPattern = parsePrefixPath(ctx.prefixPath()); - return new ShowDiskUsageStatement(pathPattern); + ShowDiskUsageStatement showDiskUsageStatement = new ShowDiskUsageStatement(pathPattern); + if (ctx.whereClause() != null) { + showDiskUsageStatement.setWhereCondition(parseWhereClause(ctx.whereClause())); + } + + // parse ORDER BY + if (ctx.orderByClause() != null) { + showDiskUsageStatement.setOrderByComponent( + parseOrderByClause(ctx.orderByClause(), ImmutableSet.of())); + } + + // parse LIMIT & OFFSET + if (ctx.rowPaginationClause() != null) { + if (ctx.rowPaginationClause().limitClause() != null) { + showDiskUsageStatement.setLimit(parseLimitClause(ctx.rowPaginationClause().limitClause())); + } + if (ctx.rowPaginationClause().offsetClause() != null) { + showDiskUsageStatement.setOffset( + parseOffsetClause(ctx.rowPaginationClause().offsetClause())); + } + } + return showDiskUsageStatement; } // show region diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index 7480ca0e432..5657eeeb38d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -78,6 +78,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWin import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TreeCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; @@ -1288,8 +1289,31 @@ public class LogicalPlanBuilder { List<TDataNodeLocation> dataNodeLocations = analysis.getReadableDataNodeLocations(); if (dataNodeLocations.size() == 1) { this.root = - new ShowDiskUsageNode( - context.getQueryId().genPlanNodeId(), dataNodeLocations.get(0), pathPattern); + planSingleShowDiskUsage(dataNodeLocations.get(0), pathPattern) + .planFilterAndTransform( + analysis.getWhereExpression(), + analysis.getSourceExpressions(), + false, + Ordering.ASC, + true) + .getRoot(); + } else if (analysis.getMergeOrderParameter().isEmpty()) { + TreeCollectNode collectNode = + new TreeCollectNode( + context.getQueryId().genPlanNodeId(), + ShowDiskUsageNode.SHOW_DISK_USAGE_HEADER_COLUMNS); + dataNodeLocations.forEach( + dataNodeLocation -> + collectNode.addChild( + planSingleShowDiskUsage(dataNodeLocation, pathPattern) + .planFilterAndTransform( + analysis.getWhereExpression(), + analysis.getSourceExpressions(), + false, + Ordering.ASC, + true) + .getRoot())); + this.root = collectNode; } else { MergeSortNode mergeSortNode = new MergeSortNode( @@ -1299,8 +1323,15 @@ public class LogicalPlanBuilder { dataNodeLocations.forEach( dataNodeLocation -> mergeSortNode.addChild( - new ShowDiskUsageNode( - context.getQueryId().genPlanNodeId(), dataNodeLocation, pathPattern))); + planSingleShowDiskUsage(dataNodeLocation, pathPattern) + .planFilterAndTransform( + analysis.getWhereExpression(), + analysis.getSourceExpressions(), + false, + Ordering.ASC, + true) + .planSort(analysis.getMergeOrderParameter()) + .getRoot())); this.root = mergeSortNode; } @@ -1312,6 +1343,13 @@ public class LogicalPlanBuilder { return this; } + private LogicalPlanBuilder planSingleShowDiskUsage( + TDataNodeLocation dataNodeLocation, PartialPath pathPattern) { + this.root = + new ShowDiskUsageNode(context.getQueryId().genPlanNodeId(), dataNodeLocation, pathPattern); + return this; + } + public LogicalPlanBuilder planOrderBy(List<SortItem> sortItemList) { if (sortItemList.isEmpty()) { return this; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java index c6d607717a9..a4ceaf29274 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java @@ -960,7 +960,11 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte public PlanNode visitShowDiskUsage( ShowDiskUsageStatement showDiskUsageStatement, MPPQueryContext context) { LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); - planBuilder = planBuilder.planShowDiskUsage(analysis, showDiskUsageStatement.getPathPattern()); + planBuilder = + planBuilder + .planShowDiskUsage(analysis, showDiskUsageStatement.getPathPattern()) + .planOffset(showDiskUsageStatement.getOffset()) + .planLimit(showDiskUsageStatement.getLimit()); return planBuilder.getRoot(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index acd5e80f515..c144fb89c7a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -57,6 +57,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.process.ActiveRegionScanMergeOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.AggregationMergeSortOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.AggregationOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.CollectOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.ColumnInjectOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.DeviceViewIntoOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.DeviceViewOperator; @@ -220,6 +221,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWin import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TreeCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TwoChildProcessNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode; @@ -285,6 +287,7 @@ import org.apache.tsfile.read.common.block.column.TimeColumn; import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.read.filter.operator.TimeFilterOperators.TimeGt; import org.apache.tsfile.read.filter.operator.TimeFilterOperators.TimeGtEq; +import org.apache.tsfile.read.reader.series.PaginationController; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TimeDuration; @@ -1172,6 +1175,20 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP getComparator(sortItemList, sortItemIndexList, sortItemDataTypeList)); } + @Override + public Operator visitCollect(TreeCollectNode node, LocalExecutionPlanContext context) { + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + CollectOperator.class.getSimpleName()); + List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context); + CollectOperator collectOperator = new CollectOperator(operatorContext, children); + return collectOperator; + } + @Override public Operator visitAggregationMergeSort( AggregationMergeSortNode node, LocalExecutionPlanContext context) { @@ -2604,7 +2621,21 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), ShowDiskUsageOperator.class.getSimpleName()); - return new ShowDiskUsageOperator(operatorContext, node.getPlanNodeId(), node.getPathPattern()); + Filter pushDownFilter = + convertPredicateToFilter( + node.getPushDownPredicate(), + node.getOutputColumnNames(), + false, + context.getTypeProvider(), + context.getZoneId()); + PaginationController paginationController = + new PaginationController(node.getPushDownLimit(), node.getPushDownOffset()); + return new ShowDiskUsageOperator( + operatorContext, + node.getPlanNodeId(), + node.getPathPattern(), + pushDownFilter, + paginationController); } private List<OutputColumn> generateOutputColumnsFromChildren(MultiChildProcessNode node) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 0db3dd3d557..ba157772439 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -1312,6 +1312,7 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution operatorContext, node.getPlanNodeId(), getSupplier( + operatorContext, node.getQualifiedObjectName().getObjectName(), dataTypes, context diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java index 8f36e44361a..6ccf6c1b214 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java @@ -54,6 +54,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWin import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TreeCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.LeftOuterTimeJoinNode; @@ -239,6 +240,11 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { return processMultiChildNode(node, context); } + @Override + public PlanNode visitCollect(TreeCollectNode node, NodeGroupContext context) { + return processMultiChildNode(node, context); + } + @Override public PlanNode visitTopK(TopKNode node, NodeGroupContext context) { return processMultiChildNode(node, context); @@ -567,10 +573,12 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { PlanNode newNode = node.clone(); PlanNode child = visit(node.getChildren().get(0), context); newNode.addChild(child); - TRegionReplicaSet dataRegion = context.getNodeDistribution(child.getPlanNodeId()).getRegion(); + NodeDistribution nodeDistribution = context.getNodeDistribution(child.getPlanNodeId()); context.putNodeDistribution( newNode.getPlanNodeId(), - new NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, dataRegion)); + new NodeDistribution( + NodeDistributionType.SAME_WITH_ALL_CHILDREN, + nodeDistribution == null ? null : nodeDistribution.getRegion())); return newNode; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java index 79489012210..abe0e0ef76b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java @@ -61,6 +61,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDevi import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TreeCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; @@ -140,6 +141,18 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> return Collections.singletonList(newRoot); } + @Override + public List<PlanNode> visitCollect(TreeCollectNode node, DistributionPlanContext context) { + TreeCollectNode newRoot = + new TreeCollectNode( + context.queryContext.getQueryId().genPlanNodeId(), node.getOutputColumnNames()); + for (int i = 0; i < node.getChildren().size(); i++) { + List<PlanNode> rewroteNodes = rewrite(node.getChildren().get(i), context); + rewroteNodes.forEach(newRoot::addChild); + } + return Collections.singletonList(newRoot); + } + private MergeSortNode cloneMergeSortNodeWithoutChild( MergeSortNode node, DistributionPlanContext context) { return new MergeSortNode( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index 6ffc534a712..fbca56e512b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -47,6 +47,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWin import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TreeCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.LeftOuterTimeJoinNode; @@ -264,6 +265,14 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter return render(node, boxValue, context); } + @Override + public List<String> visitCollect(TreeCollectNode node, GraphContext context) { + List<String> boxValue = new ArrayList<>(); + boxValue.add(String.format("Collect-%s", node.getPlanNodeId().getId())); + boxValue.add(String.format("ChildrenCount: %d", node.getChildren().size())); + return render(node, boxValue, context); + } + @Override public List<String> visitTopK(TopKNode node, GraphContext context) { List<String> boxValue = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 40bff077a0c..d50a114e5cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -87,6 +87,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWin import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TreeCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.LeftOuterTimeJoinNode; @@ -263,6 +264,7 @@ public enum PlanNodeType { LAST_QUERY_SCAN((short) 98), SHOW_DISK_USAGE((short) 99), + TREE_COLLECT((short) 100), CREATE_OR_UPDATE_TABLE_DEVICE((short) 902), TABLE_DEVICE_QUERY_SCAN((short) 903), @@ -591,6 +593,8 @@ public enum PlanNodeType { return LastQueryScanNode.deserialize(buffer); case 99: return ShowDiskUsageNode.deserialize(buffer); + case 100: + return TreeCollectNode.deserialize(buffer); case 902: return CreateOrUpdateTableDeviceNode.deserialize(buffer); case 903: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 917b9654ce7..d8c2b6d03c9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -87,6 +87,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWin import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TreeCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TwoChildProcessNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode; @@ -355,6 +356,10 @@ public abstract class PlanVisitor<R, C> { return visitMultiChildProcess(node, context); } + public R visitCollect(TreeCollectNode node, C context) { + return visitMultiChildProcess(node, context); + } + public R visitTopK(TopKNode node, C context) { return visitMultiChildProcess(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/TreeCollectNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/TreeCollectNode.java new file mode 100644 index 00000000000..99595c45192 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/TreeCollectNode.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; + +import com.google.common.base.Objects; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; + +public class TreeCollectNode extends MultiChildProcessNode { + + private final List<String> outputColumnNames; + + public TreeCollectNode(PlanNodeId id, List<String> outputColumnNames) { + super(id); + this.outputColumnNames = outputColumnNames; + } + + public TreeCollectNode(PlanNodeId id, List<PlanNode> children, List<String> outputColumnNames) { + super(id, children); + this.outputColumnNames = outputColumnNames; + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitCollect(this, context); + } + + @Override + public PlanNode clone() { + return new TreeCollectNode(id, outputColumnNames); + } + + @Override + public PlanNodeType getType() { + return PlanNodeType.TREE_COLLECT; + } + + @Override + public List<String> getOutputColumnNames() { + return outputColumnNames; + } + + @Override + public PlanNode replaceChildren(List<PlanNode> newChildren) { + checkArgument(children.size() == newChildren.size(), "wrong number of new children"); + return new TreeCollectNode(id, newChildren, outputColumnNames); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.TREE_COLLECT.serialize(byteBuffer); + ReadWriteIOUtils.write(outputColumnNames.size(), byteBuffer); + outputColumnNames.forEach(column -> ReadWriteIOUtils.write(column, byteBuffer)); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.TREE_COLLECT.serialize(stream); + ReadWriteIOUtils.write(outputColumnNames.size(), stream); + for (String outputColumnName : outputColumnNames) { + ReadWriteIOUtils.write(outputColumnName, stream); + } + } + + public static TreeCollectNode deserialize(ByteBuffer byteBuffer) { + int size = ReadWriteIOUtils.readInt(byteBuffer); + List<String> outputColumnNames = new ArrayList<>(size); + while (size-- > 0) { + outputColumnNames.add(ReadWriteIOUtils.readString(byteBuffer)); + } + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + return new TreeCollectNode(planNodeId, outputColumnNames); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(super.hashCode()); + } + + @Override + public String toString() { + return "CollectNode-" + this.getPlanNodeId(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowDiskUsageNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowDiskUsageNode.java index 843c4a8d7d9..18a0e3687e0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowDiskUsageNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowDiskUsageNode.java @@ -23,12 +23,14 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; import java.io.IOException; @@ -40,9 +42,17 @@ import java.util.Objects; public class ShowDiskUsageNode extends VirtualSourceNode { public static final List<String> SHOW_DISK_USAGE_HEADER_COLUMNS = - ImmutableList.of(ColumnHeaderConstant.DATA_NODE_ID, ColumnHeaderConstant.SIZE_IN_BYTES); + ImmutableList.of( + ColumnHeaderConstant.DATABASE, + ColumnHeaderConstant.DATA_NODE_ID, + ColumnHeaderConstant.REGION_ID, + ColumnHeaderConstant.TIME_PARTITION, + ColumnHeaderConstant.SIZE_IN_BYTES); private final PartialPath pathPattern; + private Expression pushDownPredicate; + private long pushDownLimit; + private long pushDownOffset; public ShowDiskUsageNode( PlanNodeId id, TDataNodeLocation dataNodeLocation, PartialPath pathPattern) { @@ -50,10 +60,48 @@ public class ShowDiskUsageNode extends VirtualSourceNode { this.pathPattern = pathPattern; } + public ShowDiskUsageNode( + PlanNodeId id, + TDataNodeLocation dataNodeLocation, + PartialPath pathPattern, + Expression pushDownPredicate, + long pushDownLimit, + long pushDownOffset) { + super(id, dataNodeLocation); + this.pathPattern = pathPattern; + this.pushDownPredicate = pushDownPredicate; + this.pushDownLimit = pushDownLimit; + this.pushDownOffset = pushDownOffset; + } + public PartialPath getPathPattern() { return pathPattern; } + public Expression getPushDownPredicate() { + return pushDownPredicate; + } + + public void setPushDownPredicate(Expression pushDownPredicate) { + this.pushDownPredicate = pushDownPredicate; + } + + public long getPushDownLimit() { + return pushDownLimit; + } + + public void setPushDownLimit(long pushDownLimit) { + this.pushDownLimit = pushDownLimit; + } + + public long getPushDownOffset() { + return pushDownOffset; + } + + public void setPushDownOffset(long pushDownOffset) { + this.pushDownOffset = pushDownOffset; + } + @Override public List<PlanNode> getChildren() { return Collections.emptyList(); @@ -71,7 +119,13 @@ public class ShowDiskUsageNode extends VirtualSourceNode { @Override public PlanNode clone() { - return new ShowDiskUsageNode(getPlanNodeId(), getDataNodeLocation(), pathPattern); + return new ShowDiskUsageNode( + getPlanNodeId(), + getDataNodeLocation(), + pathPattern, + pushDownPredicate, + pushDownLimit, + pushDownOffset); } @Override @@ -95,18 +149,41 @@ public class ShowDiskUsageNode extends VirtualSourceNode { protected void serializeAttributes(ByteBuffer byteBuffer) { PlanNodeType.SHOW_DISK_USAGE.serialize(byteBuffer); pathPattern.serialize(byteBuffer); + if (pushDownPredicate == null) { + ReadWriteIOUtils.write((byte) 0, byteBuffer); + } else { + ReadWriteIOUtils.write((byte) 1, byteBuffer); + Expression.serialize(pushDownPredicate, byteBuffer); + } + ReadWriteIOUtils.write(pushDownLimit, byteBuffer); + ReadWriteIOUtils.write(pushDownOffset, byteBuffer); } @Override protected void serializeAttributes(DataOutputStream stream) throws IOException { PlanNodeType.SHOW_DISK_USAGE.serialize(stream); pathPattern.serialize(stream); + if (pushDownPredicate == null) { + ReadWriteIOUtils.write((byte) 0, stream); + } else { + ReadWriteIOUtils.write((byte) 1, stream); + Expression.serialize(pushDownPredicate, stream); + } + ReadWriteIOUtils.write(pushDownLimit, stream); + ReadWriteIOUtils.write(pushDownOffset, stream); } public static ShowDiskUsageNode deserialize(ByteBuffer byteBuffer) { - PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); PartialPath pathPattern = (PartialPath) PathDeserializeUtil.deserialize(byteBuffer); - return new ShowDiskUsageNode(planNodeId, null, pathPattern); + byte isNull = ReadWriteIOUtils.readByte(byteBuffer); + Expression pushDownPredicate = null; + if (isNull == 1) { + pushDownPredicate = Expression.deserialize(byteBuffer); + } + long limit = ReadWriteIOUtils.readLong(byteBuffer); + long offset = ReadWriteIOUtils.readLong(byteBuffer); + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + return new ShowDiskUsageNode(planNodeId, null, pathPattern, pushDownPredicate, limit, offset); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowQueriesNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowQueriesNode.java index c5925f93480..25e94c0da5c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowQueriesNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowQueriesNode.java @@ -105,8 +105,8 @@ public class ShowQueriesNode extends VirtualSourceNode { } public static ShowQueriesNode deserialize(ByteBuffer byteBuffer) { - PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); String allowedUsername = ReadWriteIOUtils.readString(byteBuffer); + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); return new ShowQueriesNode(planNodeId, null, allowedUsername); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java index 5b21d8ffecf..3f7c5322c8e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java @@ -77,8 +77,7 @@ public class Analyzer { statement, parameters, parameterLookup, - warningCollector, - context); + warningCollector); Analysis analysis = new Analysis(rewrittenStatement, parameterLookup); Statement innerStatement = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index af42677b565..b1e233c02dd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner; -import org.apache.iotdb.commons.schema.table.InformationSchema; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; @@ -317,9 +316,6 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { tableScanNode = new InformationSchemaTableScanNode( idAllocator.genPlanNodeId(), qualifiedObjectName, outputSymbols, tableColumnSchema); - if (InformationSchema.hasUnlimitedQueryTimeOut(qualifiedObjectName.getObjectName())) { - queryContext.setTimeOut(Long.MAX_VALUE); - } } else { tableScanNode = new DeviceTableScanNode( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java index 4ee56d48e18..5b43b467a36 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java @@ -425,10 +425,6 @@ public abstract class AstVisitor<R, C> { return visitStatement(node, context); } - protected R visitShowDiskUsageOfTable(ShowDiskUsageOfTable node, C context) { - return visitStatement(node, context); - } - protected R visitSetProperties(SetProperties node, C context) { return visitStatement(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowDiskUsageOfTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowDiskUsageOfTable.java deleted file mode 100644 index 83acd147d0e..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowDiskUsageOfTable.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; - -import java.util.Optional; - -public class ShowDiskUsageOfTable extends ShowStatement { - - public ShowDiskUsageOfTable( - NodeLocation location, - String tableName, - Optional<Expression> where, - Optional<OrderBy> orderBy, - Optional<Offset> offset, - Optional<Node> limit) { - super(location, tableName, where, orderBy, offset, limit); - } - - @Override - public <R, C> R accept(AstVisitor<R, C> visitor, C context) { - return visitor.visitShowDiskUsageOfTable(this, context); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index d29a23dba05..76107bfa30e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -25,7 +25,6 @@ import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.schema.cache.CacheClearOptions; -import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; import org.apache.iotdb.commons.schema.table.InformationSchema; import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; @@ -188,7 +187,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowCurrentUser; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDataNodes; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDevice; -import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDiskUsageOfTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowFunctions; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowIndex; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowLoadedModels; @@ -1607,37 +1605,6 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> { limit); } - @Override - public Node visitShowDiskUsageStatement(RelationalSqlParser.ShowDiskUsageStatementContext ctx) { - QualifiedName qualifiedName = getQualifiedName(ctx.tableName); - - if (!qualifiedName.getPrefix().isPresent()) { - throw new SemanticException("database is not specified"); - } - String database = qualifiedName.getPrefix().get().toString(); - String table = qualifiedName.getSuffix(); - Optional<Expression> where = - Optional.of( - LogicalExpression.and( - new ComparisonExpression( - getLocation(ctx), - ComparisonExpression.Operator.EQUAL, - new Identifier(ColumnHeaderConstant.DATABASE.toLowerCase()), - new StringLiteral(database)), - new ComparisonExpression( - getLocation(ctx), - ComparisonExpression.Operator.EQUAL, - new Identifier(ColumnHeaderConstant.TABLE_NAME_TABLE_MODEL), - new StringLiteral(table)))); - return new ShowDiskUsageOfTable( - getLocation(ctx), - InformationSchema.TABLE_DISK_USAGE, - where, - Optional.empty(), - Optional.empty(), - Optional.empty()); - } - @Override public Node visitKillQueryStatement(RelationalSqlParser.KillQueryStatementContext ctx) { if (ctx.queryId == null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/rewrite/ShowRewrite.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/rewrite/ShowRewrite.java index f3235f2f304..63693ae099f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/rewrite/ShowRewrite.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/rewrite/ShowRewrite.java @@ -19,9 +19,6 @@ package org.apache.iotdb.db.queryengine.plan.relational.sql.rewrite; -import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; -import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction; -import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef; @@ -31,17 +28,14 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountStatement; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall; -import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupBy; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Parameter; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Relation; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Select; -import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDiskUsageOfTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowQueriesStatement; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowStatement; -import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SimpleGroupBy; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SingleColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; @@ -64,52 +58,20 @@ public final class ShowRewrite implements StatementRewrite.Rewrite { final Statement node, final List<Expression> parameters, final Map<NodeRef<Parameter>, Expression> parameterLookup, - final WarningCollector warningCollector, - final MPPQueryContext queryContext) { + final WarningCollector warningCollector) { final Visitor visitor = new Visitor(); - return (Statement) visitor.process(node, null); + return (Statement) visitor.process(node); } - private static class Visitor extends AstVisitor<Node, MPPQueryContext> { + private static class Visitor extends AstVisitor<Node, Void> { @Override - protected Node visitShowQueriesStatement(ShowQueriesStatement node, MPPQueryContext context) { + protected Node visitShowQueriesStatement(ShowQueriesStatement node, Void context) { return visitShowStatement(node, context); } @Override - protected Node visitShowDiskUsageOfTable(ShowDiskUsageOfTable node, MPPQueryContext context) { - if (context != null) { - context.setTimeOut(Long.MAX_VALUE); - } - return simpleQuery( - selectList( - new SingleColumn(new Identifier(ColumnHeaderConstant.NODE_ID_TABLE_MODEL)), - new SingleColumn( - new FunctionCall( - QualifiedName.of(TableBuiltinAggregationFunction.SUM.getFunctionName()), - Collections.singletonList( - new Identifier(ColumnHeaderConstant.SIZE_IN_BYTES_TABLE_MODEL))), - new Identifier(ColumnHeaderConstant.SIZE_IN_BYTES_TABLE_MODEL))), - from(INFORMATION_DATABASE, node.getTableName()), - node.getWhere(), - Optional.of( - new GroupBy( - false, - Collections.singletonList( - new SimpleGroupBy( - Collections.singletonList( - new Identifier(ColumnHeaderConstant.NODE_ID_TABLE_MODEL)))))), - Optional.empty(), - Optional.empty(), - node.getOrderBy(), - node.getOffset(), - node.getLimit()); - } - - @Override - protected Node visitShowStatement( - final ShowStatement showStatement, final MPPQueryContext context) { + protected Node visitShowStatement(final ShowStatement showStatement, final Void context) { return simpleQuery( selectList(new AllColumns()), from(INFORMATION_DATABASE, showStatement.getTableName()), @@ -123,8 +85,7 @@ public final class ShowRewrite implements StatementRewrite.Rewrite { } @Override - protected Node visitCountStatement( - final CountStatement countStatement, final MPPQueryContext context) { + protected Node visitCountStatement(final CountStatement countStatement, Void context) { return simpleQuery( new Select( false, @@ -149,7 +110,7 @@ public final class ShowRewrite implements StatementRewrite.Rewrite { } @Override - protected Node visitNode(final Node node, final MPPQueryContext context) { + protected Node visitNode(final Node node, final Void context) { return node; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/rewrite/StatementRewrite.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/rewrite/StatementRewrite.java index 000341fb6cb..66bcd5f4ef1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/rewrite/StatementRewrite.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/rewrite/StatementRewrite.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.queryengine.plan.relational.sql.rewrite; -import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef; @@ -49,19 +48,12 @@ public final class StatementRewrite { Statement node, List<Expression> parameters, Map<NodeRef<Parameter>, Expression> parameterLookup, - WarningCollector warningCollector, - MPPQueryContext queryContext) { + WarningCollector warningCollector) { for (Rewrite rewrite : rewrites) { node = requireNonNull( rewrite.rewrite( - analyzerFactory, - session, - node, - parameters, - parameterLookup, - warningCollector, - queryContext), + analyzerFactory, session, node, parameters, parameterLookup, warningCollector), "Statement rewrite returned null"); } return node; @@ -74,8 +66,7 @@ public final class StatementRewrite { Statement node, List<Expression> parameters, Map<NodeRef<Parameter>, Expression> parameterLookup, - WarningCollector warningCollector, - MPPQueryContext queryContext); + WarningCollector warningCollector); } public static final StatementRewrite NOOP = new StatementRewrite(ImmutableSet.of()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowDiskUsageStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowDiskUsageStatement.java index e03da031c19..313c8aabb00 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowDiskUsageStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowDiskUsageStatement.java @@ -22,16 +22,18 @@ package org.apache.iotdb.db.queryengine.plan.statement.sys; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.plan.statement.StatementType; import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; -import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey; -import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByComponent; import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; +import org.apache.iotdb.db.queryengine.plan.statement.component.WhereCondition; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowStatement; import java.util.Collections; import java.util.List; public class ShowDiskUsageStatement extends ShowStatement { - private PartialPath pathPattern; + private final PartialPath pathPattern; + private WhereCondition whereCondition; + private OrderByComponent orderByComponent; public ShowDiskUsageStatement(PartialPath pathPattern) { this.statementType = StatementType.SHOW_DISK_USAGE; @@ -42,8 +44,20 @@ public class ShowDiskUsageStatement extends ShowStatement { return pathPattern; } + public void setWhereCondition(WhereCondition whereCondition) { + this.whereCondition = whereCondition; + } + + public WhereCondition getWhereCondition() { + return whereCondition; + } + + public void setOrderByComponent(OrderByComponent orderByComponent) { + this.orderByComponent = orderByComponent; + } + public List<SortItem> getSortItemList() { - return Collections.singletonList(new SortItem(OrderByKey.DATANODEID, Ordering.ASC)); + return orderByComponent == null ? Collections.emptyList() : orderByComponent.getSortItemList(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java index 1889182a2db..076d2de5a03 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -275,7 +275,8 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { timeseriesMetadataList, deviceIteratorMap.get(resource).getFirstMeasurementNodeOfCurrentDevice(), schemaMap.keySet(), - true); + true, + null); for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) { if (!schemaMap.containsKey(timeseriesMetadata.getMeasurementId()) && !timeseriesMetadata.getChunkMetadataList().isEmpty()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java index d515a7fa3a9..d0f3426423c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java @@ -283,7 +283,7 @@ public class RepairDataFileScanUtil { throws IOException { List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); reader.getDeviceTimeseriesMetadata( - timeseriesMetadataList, metadataIndexNode, Collections.emptySet(), true); + timeseriesMetadataList, metadataIndexNode, Collections.emptySet(), true, null); long actualDeviceStartTime = Long.MAX_VALUE; long actualDeviceEndTime = Long.MIN_VALUE; for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java index 42e5c1a1820..954726f0998 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java @@ -19,6 +19,9 @@ package org.apache.iotdb.db.storageengine.dataregion.utils; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -37,17 +40,31 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; +import java.util.Optional; +import java.util.Queue; +import java.util.function.LongConsumer; import java.util.stream.Collectors; import java.util.stream.Stream; public abstract class DiskUsageStatisticUtil implements Closeable { protected static final Logger logger = LoggerFactory.getLogger(DiskUsageStatisticUtil.class); - protected List<TsFileResource> resourcesWithReadLock; + protected Queue<TsFileResource> resourcesWithReadLock; protected final Iterator<TsFileResource> iterator; + protected final LongConsumer timeSeriesMetadataIoSizeRecorder; - public DiskUsageStatisticUtil(TsFileManager tsFileManager, long timePartition) { + public DiskUsageStatisticUtil( + TsFileManager tsFileManager, + long timePartition, + Optional<FragmentInstanceContext> operatorContext) { + this.timeSeriesMetadataIoSizeRecorder = + operatorContext + .<LongConsumer>map( + context -> + context.getQueryStatistics().getLoadTimeSeriesMetadataActualIOSize()::addAndGet) + .orElse(null); List<TsFileResource> seqResources = tsFileManager.getTsFileListSnapshot(timePartition, true); List<TsFileResource> unseqResources = tsFileManager.getTsFileListSnapshot(timePartition, false); List<TsFileResource> resources = @@ -63,7 +80,7 @@ public abstract class DiskUsageStatisticUtil implements Closeable { public abstract long[] getResult(); protected void acquireReadLocks(List<TsFileResource> resources) { - this.resourcesWithReadLock = new ArrayList<>(resources.size()); + this.resourcesWithReadLock = new LinkedList<>(); try { for (TsFileResource resource : resources) { resource.readLock(); @@ -89,7 +106,32 @@ public abstract class DiskUsageStatisticUtil implements Closeable { resourcesWithReadLock = null; } - public abstract void calculateNextFile(); + public void calculateNextFile() { + TsFileResource tsFileResource = iterator.next(); + try { + if (tsFileResource.isDeleted()) { + return; + } + TsFileSequenceReader reader = + FileReaderManager.getInstance() + .get( + tsFileResource.getTsFilePath(), + tsFileResource.getTsFileID(), + true, + timeSeriesMetadataIoSizeRecorder); + calculateNextFile(tsFileResource, reader); + } catch (Exception e) { + logger.error("Failed to scan file {}", tsFileResource.getTsFile().getAbsolutePath(), e); + } finally { + // this operation including readUnlock + FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, true); + iterator.remove(); + } + } + + protected abstract void calculateNextFile( + TsFileResource tsFileResource, TsFileSequenceReader reader) + throws IOException, IllegalPathException; protected long calculateStartOffsetOfChunkGroup( TsFileSequenceReader reader, @@ -99,11 +141,10 @@ public abstract class DiskUsageStatisticUtil implements Closeable { int chunkGroupHeaderSize = new ChunkGroupHeader(deviceIsAlignedPair.getLeft()).getSerializedSize(); if (deviceIsAlignedPair.getRight()) { - List<TimeseriesMetadata> timeColumnTimeseriesMetadata = new ArrayList<>(1); - reader.readITimeseriesMetadata( - timeColumnTimeseriesMetadata, firstMeasurementNodeOfCurrentDevice, ""); - IChunkMetadata iChunkMetadata = - timeColumnTimeseriesMetadata.get(0).getChunkMetadataList().get(0); + TimeseriesMetadata timeColumnTimeseriesMetadata = + reader.getTimeColumnMetadata( + firstMeasurementNodeOfCurrentDevice, timeSeriesMetadataIoSizeRecorder); + IChunkMetadata iChunkMetadata = timeColumnTimeseriesMetadata.getChunkMetadataList().get(0); return iChunkMetadata.getOffsetOfChunkHeader() - chunkGroupHeaderSize; } else { List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); @@ -111,7 +152,8 @@ public abstract class DiskUsageStatisticUtil implements Closeable { timeseriesMetadataList, firstMeasurementNodeOfCurrentDevice, Collections.emptySet(), - true); + true, + timeSeriesMetadataIoSizeRecorder); long minOffset = Long.MAX_VALUE; for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) { for (IChunkMetadata chunkMetadata : timeseriesMetadata.getChunkMetadataList()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageStatisticUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageStatisticUtil.java index 43b93cdfe1d..99c1cf227ae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageStatisticUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageStatisticUtil.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.utils; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -36,6 +37,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; public class TableDiskUsageStatisticUtil extends DiskUsageStatisticUtil { public static final long SHALLOW_SIZE = @@ -44,8 +46,11 @@ public class TableDiskUsageStatisticUtil extends DiskUsageStatisticUtil { private final long[] resultArr; public TableDiskUsageStatisticUtil( - TsFileManager tsFileManager, long timePartition, List<String> tableNames) { - super(tsFileManager, timePartition); + TsFileManager tsFileManager, + long timePartition, + List<String> tableNames, + Optional<FragmentInstanceContext> context) { + super(tsFileManager, timePartition, context); this.tableIndexMap = new HashMap<>(); for (int i = 0; i < tableNames.size(); i++) { tableIndexMap.put(tableNames.get(i), i); @@ -59,32 +64,25 @@ public class TableDiskUsageStatisticUtil extends DiskUsageStatisticUtil { } @Override - public void calculateNextFile() { - TsFileResource tsFileResource = iterator.next(); - if (tsFileResource.isDeleted()) { + protected void calculateNextFile(TsFileResource tsFileResource, TsFileSequenceReader reader) + throws IOException { + TsFileMetadata tsFileMetadata = reader.readFileMetadata(); + if (!hasSatisfiedData(tsFileMetadata)) { return; } - try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFileResource.getTsFilePath())) { - TsFileMetadata tsFileMetadata = reader.readFileMetadata(); - if (!hasSatisfiedData(tsFileMetadata)) { - return; - } - int allSatisfiedTableIndex = getAllSatisfiedTableIndex(tsFileMetadata); - if (allSatisfiedTableIndex > 0) { - // size of tsfile - size of (tsfile magic string + version number + all metadata + metadata - // marker) - resultArr[allSatisfiedTableIndex] += - (tsFileResource.getTsFileSize() - - reader.getAllMetadataSize() - - 1 - - TSFileConfig.MAGIC_STRING.getBytes().length - - 1); - return; - } - calculateDiskUsageInBytesByOffset(reader); - } catch (Exception e) { - logger.error("Failed to scan file {}", tsFileResource.getTsFile().getAbsolutePath(), e); + int allSatisfiedTableIndex = getAllSatisfiedTableIndex(tsFileMetadata); + if (allSatisfiedTableIndex > 0) { + // size of tsfile - size of (tsfile magic string + version number + all metadata + metadata + // marker) + resultArr[allSatisfiedTableIndex] += + (tsFileResource.getTsFileSize() + - reader.getAllMetadataSize() + - 1 + - TSFileConfig.MAGIC_STRING.getBytes().length + - 1); + return; } + calculateDiskUsageInBytesByOffset(reader); } private boolean hasSatisfiedData(TsFileMetadata tsFileMetadata) { @@ -143,7 +141,8 @@ public class TableDiskUsageStatisticUtil extends DiskUsageStatisticUtil { tableName, k -> { try { - TsFileDeviceIterator deviceIterator = reader.getTableDevicesIteratorWithIsAligned(k); + TsFileDeviceIterator deviceIterator = + reader.getTableDevicesIteratorWithIsAligned(k, timeSeriesMetadataIoSizeRecorder); Pair<IDeviceID, Boolean> pair = deviceIterator.next(); return calculateStartOffsetOfChunkGroup( reader, deviceIterator.getFirstMeasurementNodeOfCurrentDevice(), pair); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TreeDiskUsageStatisticUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TreeDiskUsageStatisticUtil.java index 54687edda36..ae1c47b03e0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TreeDiskUsageStatisticUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TreeDiskUsageStatisticUtil.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.utils; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -34,6 +35,7 @@ import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.RamUsageEstimator; import java.io.IOException; +import java.util.Optional; public class TreeDiskUsageStatisticUtil extends DiskUsageStatisticUtil { @@ -45,8 +47,11 @@ public class TreeDiskUsageStatisticUtil extends DiskUsageStatisticUtil { private long result; public TreeDiskUsageStatisticUtil( - TsFileManager tsFileManager, long timePartition, PartialPath pathPattern) { - super(tsFileManager, timePartition); + TsFileManager tsFileManager, + long timePartition, + PartialPath pathPattern, + Optional<FragmentInstanceContext> context) { + super(tsFileManager, timePartition, context); this.pathPattern = pathPattern; this.result = 0; String[] nodes = pathPattern.getNodes(); @@ -71,44 +76,36 @@ public class TreeDiskUsageStatisticUtil extends DiskUsageStatisticUtil { } @Override - public void calculateNextFile() { - TsFileResource tsFileResource = iterator.next(); - if (tsFileResource.isDeleted()) { - return; - } - - try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFileResource.getTsFilePath())) { - TsFileDeviceIterator deviceIterator = reader.getAllDevicesIteratorWithIsAligned(); + protected void calculateNextFile(TsFileResource tsFileResource, TsFileSequenceReader reader) + throws IOException, IllegalPathException { + TsFileDeviceIterator deviceIterator = reader.getAllDevicesIteratorWithIsAligned(); + while (deviceIterator.hasNext()) { + Pair<IDeviceID, Boolean> deviceIsAlignedPair = deviceIterator.next(); + if (!matchPathPattern(deviceIsAlignedPair.getLeft())) { + continue; + } + MetadataIndexNode nodeOfFirstMatchedDevice = + deviceIterator.getFirstMeasurementNodeOfCurrentDevice(); + Pair<IDeviceID, Boolean> nextNotMatchedDevice = null; + MetadataIndexNode nodeOfNextNotMatchedDevice = null; while (deviceIterator.hasNext()) { - Pair<IDeviceID, Boolean> deviceIsAlignedPair = deviceIterator.next(); - if (!matchPathPattern(deviceIsAlignedPair.getLeft())) { - continue; - } - MetadataIndexNode nodeOfFirstMatchedDevice = - deviceIterator.getFirstMeasurementNodeOfCurrentDevice(); - Pair<IDeviceID, Boolean> nextNotMatchedDevice = null; - MetadataIndexNode nodeOfNextNotMatchedDevice = null; - while (deviceIterator.hasNext()) { - Pair<IDeviceID, Boolean> currentDevice = deviceIterator.next(); - if (!matchPathPattern(currentDevice.getLeft())) { - nextNotMatchedDevice = currentDevice; - nodeOfNextNotMatchedDevice = deviceIterator.getFirstMeasurementNodeOfCurrentDevice(); - break; - } - } - result += - calculatePathPatternSize( - reader, - deviceIsAlignedPair, - nodeOfFirstMatchedDevice, - nextNotMatchedDevice, - nodeOfNextNotMatchedDevice); - if (isMatchedDeviceSequential) { + Pair<IDeviceID, Boolean> currentDevice = deviceIterator.next(); + if (!matchPathPattern(currentDevice.getLeft())) { + nextNotMatchedDevice = currentDevice; + nodeOfNextNotMatchedDevice = deviceIterator.getFirstMeasurementNodeOfCurrentDevice(); break; } } - } catch (Exception e) { - logger.error("Failed to scan file {}", tsFileResource.getTsFile().getAbsolutePath(), e); + result += + calculatePathPatternSize( + reader, + deviceIsAlignedPair, + nodeOfFirstMatchedDevice, + nextNotMatchedDevice, + nodeOfNextNotMatchedDevice); + if (isMatchedDeviceSequential) { + break; + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index 39fd680f11a..45aee62f3b0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -970,7 +970,7 @@ public abstract class TVList implements WALEntryValue { TSDataType dataType = getDataType(); int maxRowCountOfCurrentBatch = Math.min( - paginationController.hasLimit() + paginationController.hasSetLimit() ? (int) paginationController.getCurLimit() : Integer.MAX_VALUE, Math.min(maxNumberOfPointsInPage, rows - index)); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/LastQueryScanNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/PlanNodeSerdeTest.java similarity index 75% rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/LastQueryScanNodeSerdeTest.java rename to iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/PlanNodeSerdeTest.java index 68a7fa0df0d..14f751b69bf 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/LastQueryScanNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/PlanNodeSerdeTest.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.plan.planner.node.PlanNodeDeserializeHelper; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowDiskUsageNode; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -34,9 +35,9 @@ import java.util.Arrays; import static org.junit.Assert.assertEquals; -public class LastQueryScanNodeSerdeTest { +public class PlanNodeSerdeTest { @Test - public void test() throws IllegalPathException { + public void testLastQueryScanNode() throws IllegalPathException { LastQueryScanNode node = new LastQueryScanNode( new PlanNodeId("test"), @@ -69,4 +70,20 @@ public class LastQueryScanNodeSerdeTest { byteBuffer.flip(); assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), node); } + + @Test + public void testShowDiskUsageNode() throws IllegalPathException { + ShowDiskUsageNode node = + new ShowDiskUsageNode(new PlanNodeId("test"), null, new PartialPath("root.test.d1")); + + ByteBuffer byteBuffer = ByteBuffer.allocate(2048); + node.serialize(byteBuffer); + byteBuffer.flip(); + assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), node); + node = new ShowDiskUsageNode(new PlanNodeId("test"), null, new PartialPath("root.test.d1")); + byteBuffer = ByteBuffer.allocate(2048); + node.serialize(byteBuffer); + byteBuffer.flip(); + assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), node); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java index 25c114f5e8f..71c0a9cde3e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java @@ -617,7 +617,10 @@ public class ColumnHeaderConstant { public static final List<ColumnHeader> showDiskUsageColumnHeaders = ImmutableList.of( + new ColumnHeader(DATABASE, TSDataType.TEXT), new ColumnHeader(DATA_NODE_ID, TSDataType.INT32), + new ColumnHeader(REGION_ID, TSDataType.INT32), + new ColumnHeader(TIME_PARTITION, TSDataType.INT64), new ColumnHeader(SIZE_IN_BYTES, TSDataType.INT64)); public static final List<ColumnHeader> showSpaceQuotaColumnHeaders = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java index 4a1200c33d7..ff37aecd2d3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java @@ -414,10 +414,6 @@ public class InformationSchema { return columnsThatSupportPushDownPredicate.containsKey(tableName); } - public static boolean hasUnlimitedQueryTimeOut(String tableName) { - return tableName.equals(TABLE_DISK_USAGE); - } - private InformationSchema() { // Utils } diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index 8be427cab93..23345008033 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -137,7 +137,6 @@ statement | setSystemStatusStatement | showVersionStatement | showQueriesStatement - | showDiskUsageStatement | killQueryStatement | loadConfigurationStatement | setConfigurationStatement @@ -640,10 +639,6 @@ showQueriesStatement limitOffsetClause ; -showDiskUsageStatement - : SHOW DISK_USAGE FROM (tableName=qualifiedName) - ; - killQueryStatement : KILL (QUERY queryId=string | ALL QUERIES) ; @@ -1400,7 +1395,7 @@ nonReserved : ABSENT | ADD | ADMIN | AFTER | ALL | ANALYZE | ANY | ARRAY | ASC | AT | ATTRIBUTE | AUDIT | AUTHORIZATION | BEGIN | BERNOULLI | BOTH | CACHE | CALL | CALLED | CASCADE | CATALOG | CATALOGS | CHAR | CHARACTER | CHARSET | CLEAR | CLUSTER | CLUSTERID | COLUMN | COLUMNS | COMMENT | COMMIT | COMMITTED | CONDITION | CONDITIONAL | CONFIGNODES | CONFIGNODE | CONFIGURATION | CONNECTOR | CONSTANT | COPARTITION | COUNT | CURRENT - | DATA | DATABASE | DATABASES | DATANODE | DATANODES | DATASET | DATE | DAY | DECLARE | DEFAULT | DEFINE | DEFINER | DENY | DESC | DESCRIPTOR | DETAILS| DETERMINISTIC | DEVICES | DISK_USAGE | DISTRIBUTED | DO | DOUBLE + | DATA | DATABASE | DATABASES | DATANODE | DATANODES | DATASET | DATE | DAY | DECLARE | DEFAULT | DEFINE | DEFINER | DENY | DESC | DESCRIPTOR | DETAILS| DETERMINISTIC | DEVICES | DISTRIBUTED | DO | DOUBLE | ELSEIF | EMPTY | ENCODING | ERROR | EXCLUDING | EXPLAIN | EXTRACTOR | FETCH | FIELD | FILTER | FINAL | FIRST | FLUSH | FOLLOWING | FORMAT | FUNCTION | FUNCTIONS | GRACE | GRANT | GRANTED | GRANTS | GRAPHVIZ | GROUPS @@ -1517,7 +1512,6 @@ DESCRIPTOR: 'DESCRIPTOR'; DETAILS: 'DETAILS'; DETERMINISTIC: 'DETERMINISTIC'; DEVICES: 'DEVICES'; -DISK_USAGE: 'DISK_USAGE'; DISTINCT: 'DISTINCT'; DISTRIBUTED: 'DISTRIBUTED'; DO: 'DO';
