This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch fe_memory_control in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 96318a2c7fbd73e2705572d839ecb030a416760c Author: lancelly <[email protected]> AuthorDate: Sat May 18 10:52:17 2024 +0800 first version --- .../db/queryengine/common/MPPQueryContext.java | 32 +++++ .../exception/MemoryNotEnoughException.java | 7 +- .../execution/MemoryEstimationHelper.java | 6 +- .../iotdb/db/queryengine/plan/Coordinator.java | 3 + .../queryengine/plan/analyze/AnalyzeVisitor.java | 154 ++++++++++++++------- .../plan/analyze/ConcatPathRewriter.java | 33 +++-- .../plan/analyze/ExpressionAnalyzer.java | 40 ++++-- .../queryengine/plan/analyze/ExpressionUtils.java | 66 ++++++--- .../queryengine/plan/analyze/TemplatedAnalyze.java | 8 +- .../db/queryengine/plan/expression/Expression.java | 3 +- .../plan/expression/binary/BinaryExpression.java | 12 ++ .../plan/expression/leaf/ConstantOperand.java | 9 ++ .../plan/expression/leaf/NullOperand.java | 10 ++ .../plan/expression/leaf/TimeSeriesOperand.java | 10 ++ .../plan/expression/leaf/TimestampOperand.java | 10 ++ .../plan/expression/multi/FunctionExpression.java | 26 ++++ .../expression/other/CaseWhenThenExpression.java | 16 +++ .../expression/other/GroupByTimeExpression.java | 9 ++ .../plan/expression/ternary/BetweenExpression.java | 14 ++ .../plan/expression/unary/InExpression.java | 12 +- .../plan/expression/unary/IsNullExpression.java | 10 ++ .../plan/expression/unary/LikeExpression.java | 12 ++ .../plan/expression/unary/LogicNotExpression.java | 11 ++ .../plan/expression/unary/NegationExpression.java | 11 ++ .../plan/expression/unary/RegularExpression.java | 12 ++ .../cartesian/BindSchemaForExpressionVisitor.java | 50 +++++-- .../cartesian/BindSchemaForPredicateVisitor.java | 43 ++++-- .../visitor/cartesian/CartesianProductVisitor.java | 23 +-- ...catDeviceAndBindSchemaForExpressionVisitor.java | 27 +++- ...ncatDeviceAndBindSchemaForPredicateVisitor.java | 30 +++- .../ConcatExpressionWithSuffixPathsVisitor.java | 28 +++- .../visitor/cartesian/QueryContextProvider.java} | 19 +-- .../plan/optimization/AggregationPushDown.java | 43 ++++-- .../plan/planner/LocalExecutionPlanner.java | 22 ++- .../plan/planner/LogicalPlanBuilder.java | 76 ++++++---- .../distribution/DistributionPlanContext.java | 6 + .../plan/planner/distribution/SourceRewriter.java | 9 ++ .../plan/node/source/AlignedLastQueryScanNode.java | 13 ++ .../source/AlignedSeriesAggregationScanNode.java | 11 ++ .../plan/node/source/AlignedSeriesScanNode.java | 12 ++ .../plan/node/source/LastQueryScanNode.java | 13 ++ .../node/source/SeriesAggregationScanNode.java | 12 ++ .../planner/plan/node/source/SeriesScanNode.java | 12 ++ .../planner/plan/node/source/SeriesSourceNode.java | 4 +- .../apache/iotdb/db/utils/ErrorHandlingUtils.java | 3 + .../plan/analyze/ExpressionAnalyzerTest.java | 8 +- 46 files changed, 789 insertions(+), 211 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index 1cc2f8663da..533cb651d31 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType; +import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; import org.apache.iotdb.db.queryengine.statistics.QueryPlanStatistics; import org.apache.tsfile.read.filter.basic.Filter; @@ -75,6 +76,18 @@ public class MPPQueryContext { QueryPlanStatistics queryPlanStatistics = null; + // To avoid query front-end from consuming too much memory, it needs to reserve memory when + // constructing some Expression and PlanNode. + private long reservedBytesInTotalForFrontEnd = 0; + + private long bytesToBeReservedForFrontEnd = 0; + + // To avoid reserving memory too frequently, we choose to do it in batches. This is upper limit + // for each batch. + private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L; + + private final LocalExecutionPlanner LOCAL_EXECUTION_PLANNER = LocalExecutionPlanner.getInstance(); + public MPPQueryContext(QueryId queryId) { this.queryId = queryId; this.endPointBlackList = new LinkedList<>(); @@ -290,4 +303,23 @@ public class MPPQueryContext { } queryPlanStatistics.setLogicalOptimizationCost(logicalOptimizeCost); } + + /** + * This method does not require concurrency control because the query plan is generated in a + * single-threaded manner. + */ + public void reserveMemoryForFrontEnd(final long bytes) { + bytesToBeReservedForFrontEnd += bytes; + if (bytesToBeReservedForFrontEnd >= MEMORY_BATCH_THRESHOLD) { + LOCAL_EXECUTION_PLANNER.reserveMemoryForQueryFrontEnd( + bytesToBeReservedForFrontEnd, reservedBytesInTotalForFrontEnd, queryId.getId()); + reservedBytesInTotalForFrontEnd += bytesToBeReservedForFrontEnd; + bytesToBeReservedForFrontEnd = 0; + } + } + + public void releaseMemoryForFrontEnd() { + LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotalForFrontEnd); + reservedBytesInTotalForFrontEnd = 0; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughException.java index 5310a45db97..c0911254cb7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughException.java @@ -19,12 +19,9 @@ package org.apache.iotdb.db.queryengine.exception; -import org.apache.iotdb.commons.exception.IoTDBException; -import org.apache.iotdb.rpc.TSStatusCode; - -public class MemoryNotEnoughException extends IoTDBException { +public class MemoryNotEnoughException extends RuntimeException { public MemoryNotEnoughException(String message) { - super(message, TSStatusCode.QUOTA_MEM_QUERY_NOT_ENOUGH.getStatusCode(), true); + super(message); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java index 85f13bfb779..a18e2dbc58b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java @@ -76,8 +76,10 @@ public class MemoryEstimationHelper { totalSize += MEASUREMENT_PATH_INSTANCE_SIZE; MeasurementPath measurementPath = (MeasurementPath) partialPath; totalSize += RamUsageEstimator.sizeOf(measurementPath.getMeasurementAlias()); - totalSize += - RamUsageEstimator.sizeOf(measurementPath.getMeasurementSchema().getMeasurementId()); + if (measurementPath.getMeasurementSchema() != null) { + totalSize += + RamUsageEstimator.sizeOf(measurementPath.getMeasurementSchema().getMeasurementId()); + } } else { totalSize += PARTIAL_PATH_INSTANCE_SIZE; totalSize += RamUsageEstimator.sizeOf(partialPath.getMeasurement()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 3b406bdb48c..65dcd830195 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -141,6 +141,9 @@ public class Coordinator { } return result; } finally { + if (queryContext != null) { + queryContext.releaseMemoryForFrontEnd(); + } if (queryContext != null && !queryContext.getAcquiredLockNumMap().isEmpty()) { Map<SchemaLockType, Integer> lockMap = queryContext.getAcquiredLockNumMap(); for (Map.Entry<SchemaLockType, Integer> entry : lockMap.entrySet()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index a7448a93529..94ec9a682cd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -291,20 +291,21 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> deviceList = pushDownLimitOffsetInGroupByTimeForDevice(deviceList, queryStatement); } - outputExpressions = analyzeSelect(analysis, queryStatement, schemaTree, deviceList); + outputExpressions = + analyzeSelect(analysis, queryStatement, schemaTree, deviceList, context); if (outputExpressions.isEmpty()) { return finishQuery(queryStatement, analysis); } - analyzeDeviceToWhere(analysis, queryStatement, schemaTree, deviceList); + analyzeDeviceToWhere(analysis, queryStatement, schemaTree, deviceList, context); if (deviceList.isEmpty()) { return finishQuery(queryStatement, analysis, outputExpressions); } analysis.setDeviceList(deviceList); - analyzeDeviceToGroupBy(analysis, queryStatement, schemaTree, deviceList); - analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree, deviceList); - analyzeHaving(analysis, queryStatement, schemaTree, deviceList); + analyzeDeviceToGroupBy(analysis, queryStatement, schemaTree, deviceList, context); + analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree, deviceList, context); + analyzeHaving(analysis, queryStatement, schemaTree, deviceList, context); analyzeDeviceToAggregation(analysis, queryStatement); analyzeDeviceToSourceTransform(analysis, queryStatement); @@ -321,22 +322,25 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> new GroupByLevelHelper(queryStatement.getGroupByLevelComponent().getLevels()); outputExpressions = - analyzeGroupByLevelSelect(analysis, queryStatement, schemaTree, groupByLevelHelper); + analyzeGroupByLevelSelect( + analysis, queryStatement, schemaTree, groupByLevelHelper, context); if (outputExpressions.isEmpty()) { return finishQuery(queryStatement, analysis); } analysis.setOutputExpressions(outputExpressions); setSelectExpressions(analysis, queryStatement, outputExpressions); - analyzeGroupByLevelHaving(analysis, queryStatement, schemaTree, groupByLevelHelper); + analyzeGroupByLevelHaving( + analysis, queryStatement, schemaTree, groupByLevelHelper, context); - analyzeGroupByLevelOrderBy(analysis, queryStatement, schemaTree, groupByLevelHelper); + analyzeGroupByLevelOrderBy( + analysis, queryStatement, schemaTree, groupByLevelHelper, context); checkDataTypeConsistencyInGroupByLevel( analysis, groupByLevelHelper.getGroupByLevelExpressions()); analysis.setCrossGroupByExpressions(groupByLevelHelper.getGroupByLevelExpressions()); } else { - outputExpressions = analyzeSelect(analysis, queryStatement, schemaTree); + outputExpressions = analyzeSelect(analysis, queryStatement, schemaTree, context); analyzeGroupByTag(analysis, queryStatement, outputExpressions); @@ -346,17 +350,17 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> analysis.setOutputExpressions(outputExpressions); setSelectExpressions(analysis, queryStatement, outputExpressions); - analyzeHaving(analysis, queryStatement, schemaTree); + analyzeHaving(analysis, queryStatement, schemaTree, context); - analyzeOrderBy(analysis, queryStatement, schemaTree); + analyzeOrderBy(analysis, queryStatement, schemaTree, context); } // analyze aggregation analyzeAggregation(analysis, queryStatement); // analyze aggregation input - analyzeGroupBy(analysis, queryStatement, schemaTree); - analyzeWhere(analysis, queryStatement, schemaTree); + analyzeGroupBy(analysis, queryStatement, schemaTree, context); + analyzeWhere(analysis, queryStatement, schemaTree, context); if (analysis.getWhereExpression() != null && analysis.getWhereExpression().equals(ConstantOperand.FALSE)) { return finishQuery(queryStatement, analysis, outputExpressions); @@ -395,7 +399,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> queryStatement = (QueryStatement) concatPathRewriter.rewrite( - queryStatement, new PathPatternTree(queryStatement.useWildcard())); + queryStatement, new PathPatternTree(queryStatement.useWildcard()), context); analysis.setStatement(queryStatement); // request schema fetch API @@ -495,7 +499,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) { selectExpressions.add(resultColumn.getExpression()); } - analyzeLastSource(analysis, selectExpressions, schemaTree); + analyzeLastSource(analysis, selectExpressions, schemaTree, context); analysis.setRespDatasetHeader(DatasetHeaderFactory.getLastQueryHeader()); @@ -506,14 +510,17 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> } private void analyzeLastSource( - Analysis analysis, List<Expression> selectExpressions, ISchemaTree schemaTree) { + Analysis analysis, + List<Expression> selectExpressions, + ISchemaTree schemaTree, + MPPQueryContext context) { Set<Expression> sourceExpressions = new LinkedHashSet<>(); Set<Expression> lastQueryBaseExpressions = new LinkedHashSet<>(); Map<Expression, List<Expression>> lastQueryNonWritableViewSourceExpressionMap = null; for (Expression selectExpression : selectExpressions) { for (Expression lastQuerySourceExpression : - bindSchemaForExpression(selectExpression, schemaTree)) { + bindSchemaForExpression(selectExpression, schemaTree, context)) { if (lastQuerySourceExpression instanceof TimeSeriesOperand) { lastQueryBaseExpressions.add(lastQuerySourceExpression); sourceExpressions.add(lastQuerySourceExpression); @@ -584,7 +591,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - GroupByLevelHelper groupByLevelHelper) { + GroupByLevelHelper groupByLevelHelper, + MPPQueryContext queryContext) { Map<Integer, Set<Pair<Expression, String>>> outputExpressionMap = new HashMap<>(); int columnIndex = 0; @@ -592,7 +600,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> Set<Pair<Expression, String>> outputExpressionSet = new LinkedHashSet<>(); List<Expression> resultExpressions = - bindSchemaForExpression(resultColumn.getExpression(), schemaTree); + bindSchemaForExpression(resultColumn.getExpression(), schemaTree, queryContext); boolean isCountStar = resultColumn.getExpression().getExpressionType().equals(ExpressionType.FUNCTION) && ((FunctionExpression) resultColumn.getExpression()).isCountStar(); @@ -640,7 +648,10 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> /** process select component for align by time. */ private List<Pair<Expression, String>> analyzeSelect( - Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree) { + Analysis analysis, + QueryStatement queryStatement, + ISchemaTree schemaTree, + MPPQueryContext queryContext) { Map<Integer, List<Pair<Expression, String>>> outputExpressionMap = new HashMap<>(); ColumnPaginationController paginationController = @@ -654,7 +665,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) { List<Pair<Expression, String>> outputExpressions = new ArrayList<>(); List<Expression> resultExpressions = - bindSchemaForExpression(resultColumn.getExpression(), schemaTree); + bindSchemaForExpression(resultColumn.getExpression(), schemaTree, queryContext); for (Expression resultExpression : resultExpressions) { if (paginationController.hasCurOffset()) { @@ -710,7 +721,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - List<PartialPath> deviceList) { + List<PartialPath> deviceList, + MPPQueryContext queryContext) { List<Pair<Expression, String>> outputExpressions = new ArrayList<>(); Map<String, Set<Expression>> deviceToSelectExpressions = new HashMap<>(); ColumnPaginationController paginationController = @@ -726,7 +738,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> new LinkedHashMap<>(); for (PartialPath device : deviceList) { List<Expression> selectExpressionsOfOneDevice = - concatDeviceAndBindSchemaForExpression(selectExpression, device, schemaTree); + concatDeviceAndBindSchemaForExpression( + selectExpression, device, schemaTree, queryContext); if (selectExpressionsOfOneDevice.isEmpty()) { continue; } @@ -863,14 +876,16 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - UnaryOperator<Expression> havingExpressionAnalyzer) { + UnaryOperator<Expression> havingExpressionAnalyzer, + MPPQueryContext queryContext) { // get removeWildcard Expressions in Having List<Expression> conJunctions = ExpressionAnalyzer.bindSchemaForPredicate( queryStatement.getHavingCondition().getPredicate(), queryStatement.getFromComponent().getPrefixPaths(), schemaTree, - true); + true, + queryContext); Expression havingExpression = PredicateUtils.combineConjuncts( conJunctions.stream().distinct().collect(Collectors.toList())); @@ -888,20 +903,28 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> } private void analyzeHaving( - Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree) { + Analysis analysis, + QueryStatement queryStatement, + ISchemaTree schemaTree, + MPPQueryContext queryContext) { if (!queryStatement.hasHaving()) { return; } analyzeHavingBase( - analysis, queryStatement, schemaTree, ExpressionAnalyzer::normalizeExpression); + analysis, + queryStatement, + schemaTree, + ExpressionAnalyzer::normalizeExpression, + queryContext); } private void analyzeGroupByLevelHaving( Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - GroupByLevelHelper groupByLevelHelper) { + GroupByLevelHelper groupByLevelHelper, + MPPQueryContext queryContext) { if (!queryStatement.hasHaving()) { return; } @@ -912,7 +935,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> schemaTree, havingExpression -> PredicateUtils.removeDuplicateConjunct( - groupByLevelHelper.applyLevels(havingExpression, analysis))); + groupByLevelHelper.applyLevels(havingExpression, analysis)), + queryContext); // update groupByLevelExpressions groupByLevelHelper.updateGroupByLevelExpressions(analysis.getHavingExpression()); } @@ -921,7 +945,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - List<PartialPath> deviceSet) { + List<PartialPath> deviceSet, + MPPQueryContext queryContext) { if (!queryStatement.hasHaving()) { return; } @@ -937,7 +962,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> for (PartialPath device : deviceSet) { List<Expression> expressionsInHaving = - concatDeviceAndBindSchemaForExpression(havingExpression, device, schemaTree); + concatDeviceAndBindSchemaForExpression( + havingExpression, device, schemaTree, queryContext); conJunctions.addAll( expressionsInHaving.stream() @@ -1341,7 +1367,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - List<PartialPath> deviceSet) { + List<PartialPath> deviceSet, + MPPQueryContext queryContext) { if (!queryStatement.hasWhere()) { return; } @@ -1352,7 +1379,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> while (deviceIterator.hasNext()) { PartialPath devicePath = deviceIterator.next(); Expression whereExpression = - analyzeWhereSplitByDevice(queryStatement, devicePath, schemaTree); + analyzeWhereSplitByDevice(queryStatement, devicePath, schemaTree, queryContext); if (whereExpression.equals(ConstantOperand.FALSE)) { deviceIterator.remove(); } else if (whereExpression.equals(ConstantOperand.TRUE)) { @@ -1371,7 +1398,10 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> } private void analyzeWhere( - Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree) { + Analysis analysis, + QueryStatement queryStatement, + ISchemaTree schemaTree, + MPPQueryContext queryContext) { if (!queryStatement.hasWhere()) { return; } @@ -1380,7 +1410,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> queryStatement.getWhereCondition().getPredicate(), queryStatement.getFromComponent().getPrefixPaths(), schemaTree, - true); + true, + queryContext); Expression whereExpression = convertConJunctionsToWhereExpression(conJunctions); if (whereExpression.equals(ConstantOperand.TRUE)) { analysis.setWhereExpression(null); @@ -1396,10 +1427,17 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> } private Expression analyzeWhereSplitByDevice( - QueryStatement queryStatement, PartialPath devicePath, ISchemaTree schemaTree) { + final QueryStatement queryStatement, + final PartialPath devicePath, + final ISchemaTree schemaTree, + final MPPQueryContext queryContext) { List<Expression> conJunctions = ExpressionAnalyzer.concatDeviceAndBindSchemaForPredicate( - queryStatement.getWhereCondition().getPredicate(), devicePath, schemaTree, true); + queryStatement.getWhereCondition().getPredicate(), + devicePath, + schemaTree, + true, + queryContext); return convertConJunctionsToWhereExpression(conJunctions); } @@ -1568,11 +1606,13 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - UnaryOperator<List<Expression>> orderByExpressionAnalyzer) { + UnaryOperator<List<Expression>> orderByExpressionAnalyzer, + MPPQueryContext queryContext) { Set<Expression> orderByExpressions = new LinkedHashSet<>(); for (Expression expressionForItem : queryStatement.getExpressionSortItemList()) { // Expression in a sortItem only indicates one column - List<Expression> expressions = bindSchemaForExpression(expressionForItem, schemaTree); + List<Expression> expressions = + bindSchemaForExpression(expressionForItem, schemaTree, queryContext); if (expressions.isEmpty()) { throw new SemanticException( String.format( @@ -1600,19 +1640,24 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> } private void analyzeOrderBy( - Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree) { + Analysis analysis, + QueryStatement queryStatement, + ISchemaTree schemaTree, + MPPQueryContext queryContext) { if (!queryStatement.hasOrderByExpression()) { return; } - analyzeOrderByBase(analysis, queryStatement, schemaTree, expressions -> expressions); + analyzeOrderByBase( + analysis, queryStatement, schemaTree, expressions -> expressions, queryContext); } private void analyzeGroupByLevelOrderBy( Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - GroupByLevelHelper groupByLevelHelper) { + GroupByLevelHelper groupByLevelHelper, + MPPQueryContext queryContext) { if (!queryStatement.hasOrderByExpression()) { return; } @@ -1627,7 +1672,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> groupedExpressions.add(groupByLevelHelper.applyLevels(expression, analysis)); } return new ArrayList<>(groupedExpressions); - }); + }, + queryContext); // update groupByLevelExpressions for (Expression orderByExpression : analysis.getOrderByExpressions()) { groupByLevelHelper.updateGroupByLevelExpressions(orderByExpression); @@ -1642,7 +1688,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - List<PartialPath> deviceSet) { + List<PartialPath> deviceSet, + MPPQueryContext queryContext) { if (queryStatement.getGroupByComponent() == null) { return; } @@ -1654,7 +1701,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> Expression expression = groupByComponent.getControlColumnExpression(); for (PartialPath device : deviceSet) { List<Expression> groupByExpressionsOfOneDevice = - concatDeviceAndBindSchemaForExpression(expression, device, schemaTree); + concatDeviceAndBindSchemaForExpression(expression, device, schemaTree, queryContext); if (groupByExpressionsOfOneDevice.isEmpty()) { throw new SemanticException( @@ -1713,7 +1760,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - List<PartialPath> deviceSet) { + List<PartialPath> deviceSet, + MPPQueryContext queryContext) { if (!queryStatement.hasOrderByExpression()) { return; } @@ -1726,7 +1774,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> Set<Expression> orderByExpressionsForOneDevice = new LinkedHashSet<>(); for (Expression expressionForItem : queryStatement.getExpressionSortItemList()) { List<Expression> expressions = - concatDeviceAndBindSchemaForExpression(expressionForItem, device, schemaTree); + concatDeviceAndBindSchemaForExpression( + expressionForItem, device, schemaTree, queryContext); if (expressions.isEmpty()) { throw new SemanticException( String.format( @@ -1763,7 +1812,10 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> } private void analyzeGroupBy( - Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree) { + Analysis analysis, + QueryStatement queryStatement, + ISchemaTree schemaTree, + MPPQueryContext queryContext) { if (queryStatement.getGroupByComponent() == null) { return; @@ -1775,7 +1827,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> if (queryStatement.hasGroupByExpression()) { groupByExpression = groupByComponent.getControlColumnExpression(); // Expression in group by variation clause only indicates one column - List<Expression> expressions = bindSchemaForExpression(groupByExpression, schemaTree); + List<Expression> expressions = + bindSchemaForExpression(groupByExpression, schemaTree, queryContext); if (expressions.isEmpty()) { throw new SemanticException( String.format( @@ -2891,7 +2944,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> analysis, Collections.singletonList( new TimeSeriesOperand(showTimeSeriesStatement.getPathPattern())), - schemaTree); + schemaTree, + context); analyzeDataPartition(analysis, new QueryStatement(), schemaTree, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ConcatPathRewriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ConcatPathRewriter.java index 7ac927ac643..041f08e1b5c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ConcatPathRewriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ConcatPathRewriter.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.commons.path.PathPatternTreeUtils; import org.apache.iotdb.db.exception.sql.StatementAnalyzeException; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.component.ResultColumn; @@ -48,7 +49,8 @@ public class ConcatPathRewriter { return patternTree; } - public Statement rewrite(Statement statement, PathPatternTree patternTree) + public Statement rewrite( + Statement statement, PathPatternTree patternTree, MPPQueryContext queryContext) throws StatementAnalyzeException { QueryStatement queryStatement = (QueryStatement) statement; this.patternTree = patternTree; @@ -75,7 +77,7 @@ public class ConcatPathRewriter { } else { // concat SELECT with FROM List<ResultColumn> resultColumns = - concatSelectWithFrom(queryStatement.getSelectComponent(), prefixPaths); + concatSelectWithFrom(queryStatement.getSelectComponent(), prefixPaths, queryContext); queryStatement.getSelectComponent().setResultColumns(resultColumns); // concat GROUP BY with FROM @@ -85,12 +87,13 @@ public class ConcatPathRewriter { .setControlColumnExpression( contactGroupByWithFrom( queryStatement.getGroupByComponent().getControlColumnExpression(), - prefixPaths)); + prefixPaths, + queryContext)); } if (queryStatement.hasOrderByExpression()) { List<Expression> sortItemExpressions = queryStatement.getExpressionSortItemList(); sortItemExpressions.replaceAll( - expression -> contactOrderByWithFrom(expression, prefixPaths)); + expression -> contactOrderByWithFrom(expression, prefixPaths, queryContext)); } } @@ -119,14 +122,16 @@ public class ConcatPathRewriter { * path pattern. And construct pattern tree. */ private List<ResultColumn> concatSelectWithFrom( - SelectComponent selectComponent, List<PartialPath> prefixPaths) + final SelectComponent selectComponent, + final List<PartialPath> prefixPaths, + final MPPQueryContext queryContext) throws StatementAnalyzeException { // resultColumns after concat List<ResultColumn> resultColumns = new ArrayList<>(); for (ResultColumn resultColumn : selectComponent.getResultColumns()) { List<Expression> resultExpressions = ExpressionAnalyzer.concatExpressionWithSuffixPaths( - resultColumn.getExpression(), prefixPaths, patternTree); + resultColumn.getExpression(), prefixPaths, patternTree, queryContext); for (Expression resultExpression : resultExpressions) { resultColumns.add( new ResultColumn( @@ -136,18 +141,26 @@ public class ConcatPathRewriter { return resultColumns; } - private Expression contactGroupByWithFrom(Expression expression, List<PartialPath> prefixPaths) { + private Expression contactGroupByWithFrom( + final Expression expression, + final List<PartialPath> prefixPaths, + final MPPQueryContext queryContext) { List<Expression> resultExpressions = - ExpressionAnalyzer.concatExpressionWithSuffixPaths(expression, prefixPaths, patternTree); + ExpressionAnalyzer.concatExpressionWithSuffixPaths( + expression, prefixPaths, patternTree, queryContext); if (resultExpressions.size() != 1) { throw new IllegalStateException("Expression in group by should indicate one value"); } return resultExpressions.get(0); } - private Expression contactOrderByWithFrom(Expression expression, List<PartialPath> prefixPaths) { + private Expression contactOrderByWithFrom( + final Expression expression, + final List<PartialPath> prefixPaths, + final MPPQueryContext queryContext) { List<Expression> resultExpressions = - ExpressionAnalyzer.concatExpressionWithSuffixPaths(expression, prefixPaths, patternTree); + ExpressionAnalyzer.concatExpressionWithSuffixPaths( + expression, prefixPaths, patternTree, queryContext); if (resultExpressions.size() != 1) { throw new IllegalStateException("Expression in order by should indicate one value"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzer.java index 010cf8fcd01..9219a3d45b0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzer.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction; import org.apache.iotdb.commons.udf.builtin.BuiltinTimeSeriesGeneratingFunction; import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.expression.Expression; @@ -264,11 +265,15 @@ public class ExpressionAnalyzer { * @return the concatenated expression list */ public static List<Expression> concatExpressionWithSuffixPaths( - Expression expression, List<PartialPath> prefixPaths, PathPatternTree patternTree) { + final Expression expression, + final List<PartialPath> prefixPaths, + final PathPatternTree patternTree, + final MPPQueryContext queryContext) { return new ConcatExpressionWithSuffixPathsVisitor() .process( expression, - new ConcatExpressionWithSuffixPathsVisitor.Context(prefixPaths, patternTree)); + new ConcatExpressionWithSuffixPathsVisitor.Context( + prefixPaths, patternTree, queryContext)); } /** @@ -405,8 +410,9 @@ public class ExpressionAnalyzer { * expressions */ public static List<Expression> bindSchemaForExpression( - Expression expression, ISchemaTree schemaTree) { - return new BindSchemaForExpressionVisitor().process(expression, schemaTree); + Expression expression, ISchemaTree schemaTree, MPPQueryContext queryContext) { + return new BindSchemaForExpressionVisitor() + .process(expression, new BindSchemaForExpressionVisitor.Context(schemaTree, queryContext)); } /** @@ -419,10 +425,16 @@ public class ExpressionAnalyzer { * @return the expression list with full path and after binding schema */ public static List<Expression> bindSchemaForPredicate( - Expression predicate, List<PartialPath> prefixPaths, ISchemaTree schemaTree, boolean isRoot) { + Expression predicate, + List<PartialPath> prefixPaths, + ISchemaTree schemaTree, + boolean isRoot, + MPPQueryContext queryContext) { return new BindSchemaForPredicateVisitor() .process( - predicate, new BindSchemaForPredicateVisitor.Context(prefixPaths, schemaTree, isRoot)); + predicate, + new BindSchemaForPredicateVisitor.Context( + prefixPaths, schemaTree, isRoot, queryContext)); } public static Expression replaceRawPathWithGroupedPath( @@ -445,11 +457,15 @@ public class ExpressionAnalyzer { * @return expression list with full path and after binding schema */ public static List<Expression> concatDeviceAndBindSchemaForExpression( - Expression expression, PartialPath devicePath, ISchemaTree schemaTree) { + final Expression expression, + final PartialPath devicePath, + final ISchemaTree schemaTree, + final MPPQueryContext queryContext) { return new ConcatDeviceAndBindSchemaForExpressionVisitor() .process( expression, - new ConcatDeviceAndBindSchemaForExpressionVisitor.Context(devicePath, schemaTree)); + new ConcatDeviceAndBindSchemaForExpressionVisitor.Context( + devicePath, schemaTree, queryContext)); } /** @@ -459,12 +475,16 @@ public class ExpressionAnalyzer { * @return the expression list with full path and after binding schema */ public static List<Expression> concatDeviceAndBindSchemaForPredicate( - Expression predicate, PartialPath devicePath, ISchemaTree schemaTree, boolean isWhere) { + final Expression predicate, + final PartialPath devicePath, + final ISchemaTree schemaTree, + final boolean isWhere, + final MPPQueryContext queryContext) { return new ConcatDeviceAndBindSchemaForPredicateVisitor() .process( predicate, new ConcatDeviceAndBindSchemaForPredicateVisitor.Context( - devicePath, schemaTree, isWhere)); + devicePath, schemaTree, isWhere, queryContext)); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java index 04b348a2462..fc9d3a0aa51 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.analyze; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.UnknownExpressionTypeException; @@ -61,11 +62,16 @@ public class ExpressionUtils { // util class } - public static List<Expression> reconstructTimeSeriesOperands( - TimeSeriesOperand rawExpression, List<? extends PartialPath> actualPaths) { + /* Use queryContext to record the memory usage of the constructed Expression. */ + public static List<Expression> reconstructTimeSeriesOperandsWithMemoryCheck( + final TimeSeriesOperand rawExpression, + final List<? extends PartialPath> actualPaths, + final MPPQueryContext queryContext) { List<Expression> resultExpressions = new ArrayList<>(); for (PartialPath actualPath : actualPaths) { - resultExpressions.add(reconstructTimeSeriesOperand(rawExpression, actualPath)); + resultExpressions.add( + reserveMemoryForExpression( + queryContext, reconstructTimeSeriesOperand(rawExpression, actualPath))); } return resultExpressions; } @@ -76,11 +82,15 @@ public class ExpressionUtils { return cloneCommonFields(rawExpression, resultExpression); } - public static List<Expression> reconstructFunctionExpressions( - FunctionExpression expression, List<List<Expression>> childExpressionsList) { + public static List<Expression> reconstructFunctionExpressionsWithMemoryCheck( + final FunctionExpression expression, + final List<List<Expression>> childExpressionsList, + final MPPQueryContext queryContext) { List<Expression> resultExpressions = new ArrayList<>(); for (List<Expression> functionExpressions : childExpressionsList) { - resultExpressions.add(reconstructFunctionExpression(expression, functionExpressions)); + resultExpressions.add( + reserveMemoryForExpression( + queryContext, reconstructFunctionExpression(expression, functionExpressions))); } return resultExpressions; } @@ -107,11 +117,15 @@ public class ExpressionUtils { return cloneCommonFields(rawExpression, resultExpression); } - public static List<Expression> reconstructUnaryExpressions( - UnaryExpression expression, List<Expression> childExpressions) { + public static List<Expression> reconstructUnaryExpressionsWithMemoryCheck( + final UnaryExpression expression, + final List<Expression> childExpressions, + final MPPQueryContext queryContext) { List<Expression> resultExpressions = new ArrayList<>(); for (Expression childExpression : childExpressions) { - resultExpressions.add(reconstructUnaryExpression(expression, childExpression)); + resultExpressions.add( + reserveMemoryForExpression( + queryContext, reconstructUnaryExpression(expression, childExpression))); } return resultExpressions; } @@ -172,14 +186,17 @@ public class ExpressionUtils { return cloneCommonFields(rawExpression, resultExpression); } - public static List<Expression> reconstructBinaryExpressions( - BinaryExpression expression, - List<Expression> leftExpressions, - List<Expression> rightExpressions) { + public static List<Expression> reconstructBinaryExpressionsWithMemoryCheck( + final BinaryExpression expression, + final List<Expression> leftExpressions, + final List<Expression> rightExpressions, + final MPPQueryContext queryContext) { List<Expression> resultExpressions = new ArrayList<>(); for (Expression le : leftExpressions) { for (Expression re : rightExpressions) { - resultExpressions.add(reconstructBinaryExpression(expression, le, re)); + resultExpressions.add( + reserveMemoryForExpression( + queryContext, reconstructBinaryExpression(expression, le, re))); } } return resultExpressions; @@ -238,16 +255,19 @@ public class ExpressionUtils { return cloneCommonFields(rawExpression, resultExpression); } - public static List<Expression> reconstructTernaryExpressions( - TernaryExpression expression, - List<Expression> firstExpressions, - List<Expression> secondExpressions, - List<Expression> thirdExpressions) { + public static List<Expression> reconstructTernaryExpressionsWithMemoryCheck( + final TernaryExpression expression, + final List<Expression> firstExpressions, + final List<Expression> secondExpressions, + final List<Expression> thirdExpressions, + final MPPQueryContext queryContext) { List<Expression> resultExpressions = new ArrayList<>(); for (Expression fe : firstExpressions) { for (Expression se : secondExpressions) for (Expression te : thirdExpressions) { - resultExpressions.add(reconstructTernaryExpression(expression, fe, se, te)); + resultExpressions.add( + reserveMemoryForExpression( + queryContext, reconstructTernaryExpression(expression, fe, se, te))); } } return resultExpressions; @@ -278,6 +298,12 @@ public class ExpressionUtils { return resultExpression; } + private static Expression reserveMemoryForExpression( + MPPQueryContext queryContext, Expression expression) { + queryContext.reserveMemoryForFrontEnd(expression == null ? 0 : expression.ramBytesUsed()); + return expression; + } + /** * Make cartesian product. Attention, in this implementation, the way to handle the empty set is * to ignore it instead of making the result an empty set. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java index 856476eae24..e1fbc88bbcf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java @@ -183,7 +183,7 @@ public class TemplatedAnalyze { } analysis.setDeviceList(deviceList); - analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree, deviceList); + analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree, deviceList, context); analyzeDeviceToSourceTransform(analysis); analyzeDeviceToSource(analysis); @@ -272,7 +272,8 @@ public class TemplatedAnalyze { Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - List<PartialPath> deviceSet) { + List<PartialPath> deviceSet, + MPPQueryContext queryContext) { if (!queryStatement.hasOrderByExpression()) { return; } @@ -285,7 +286,8 @@ public class TemplatedAnalyze { Set<Expression> orderByExpressionsForOneDevice = new LinkedHashSet<>(); for (Expression expressionForItem : queryStatement.getExpressionSortItemList()) { List<Expression> expressions = - concatDeviceAndBindSchemaForExpression(expressionForItem, device, schemaTree); + concatDeviceAndBindSchemaForExpression( + expressionForItem, device, schemaTree, queryContext); if (expressions.isEmpty()) { throw new SemanticException( String.format( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/Expression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/Expression.java index 48b1b5806dc..9b89a122b06 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/Expression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/Expression.java @@ -56,6 +56,7 @@ import org.apache.iotdb.db.queryengine.transformation.dag.memory.LayerMemoryAssi import org.apache.iotdb.db.queryengine.transformation.dag.udf.UDTFExecutor; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Accountable; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -71,7 +72,7 @@ import java.util.NoSuchElementException; import java.util.Objects; /** A skeleton class for expression */ -public abstract class Expression extends StatementNode { +public abstract class Expression extends StatementNode implements Accountable { ///////////////////////////////////////////////////////////////////////////////////////////////// // Operations that Class Expression is not responsible for should be done through a visitor ///////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/binary/BinaryExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/binary/BinaryExpression.java index 793e09d98f8..39189759a60 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/binary/BinaryExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/binary/BinaryExpression.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.expression.binary; import org.apache.iotdb.db.queryengine.common.NodeRef; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; @@ -27,6 +28,7 @@ import org.apache.iotdb.db.queryengine.transformation.dag.memory.LayerMemoryAssi import org.apache.iotdb.db.queryengine.transformation.dag.udf.UDTFExecutor; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.RamUsageEstimator; import java.io.DataOutputStream; import java.io.IOException; @@ -38,6 +40,9 @@ import java.util.Map; public abstract class BinaryExpression extends Expression { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(BinaryExpression.class); + protected Expression leftExpression; protected Expression rightExpression; @@ -156,6 +161,13 @@ public abstract class BinaryExpression extends Expression { return buildExpression(left, right); } + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(leftExpression) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(rightExpression); + } + private String buildExpression(String left, String right) { StringBuilder builder = new StringBuilder(); if (leftExpression.getExpressionType().getPriority() < this.getExpressionType().getPriority()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/ConstantOperand.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/ConstantOperand.java index 92a7a7b328c..f4527cdc755 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/ConstantOperand.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/ConstantOperand.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation import org.apache.iotdb.db.queryengine.transformation.dag.memory.LayerMemoryAssigner; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -40,6 +41,9 @@ public class ConstantOperand extends LeafOperand { public static final ConstantOperand FALSE = new ConstantOperand(TSDataType.BOOLEAN, "false"); public static final ConstantOperand TRUE = new ConstantOperand(TSDataType.BOOLEAN, "true"); + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(ConstantOperand.class); + private final String valueString; private final TSDataType dataType; @@ -116,4 +120,9 @@ public class ConstantOperand extends LeafOperand { dataType.serializeTo(stream); ReadWriteIOUtils.write(valueString, stream); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + RamUsageEstimator.sizeOf(valueString); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/NullOperand.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/NullOperand.java index 4c7181b9f81..4e2781a133c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/NullOperand.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/NullOperand.java @@ -24,6 +24,8 @@ import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.transformation.dag.memory.LayerMemoryAssigner; +import org.apache.tsfile.utils.RamUsageEstimator; + import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -31,6 +33,9 @@ import java.util.List; import java.util.Map; public class NullOperand extends LeafOperand { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(NullOperand.class); + @Override public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) { return visitor.visitNullOperand(this, context); @@ -71,4 +76,9 @@ public class NullOperand extends LeafOperand { protected void serialize(DataOutputStream stream) throws IOException { // do nothing } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimeSeriesOperand.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimeSeriesOperand.java index a05b2577d93..22ded4d95d7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimeSeriesOperand.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimeSeriesOperand.java @@ -22,12 +22,14 @@ package org.apache.iotdb.db.queryengine.plan.expression.leaf; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.transformation.dag.memory.LayerMemoryAssigner; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.RamUsageEstimator; import java.io.DataOutputStream; import java.io.IOException; @@ -37,6 +39,9 @@ import java.util.Map; public class TimeSeriesOperand extends LeafOperand { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TimeSeriesOperand.class); + private PartialPath path; public TimeSeriesOperand(PartialPath path) { @@ -106,4 +111,9 @@ public class TimeSeriesOperand extends LeafOperand { protected void serialize(DataOutputStream stream) throws IOException { path.serialize(stream); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(path); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimestampOperand.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimestampOperand.java index ece2f5ad9ef..6c817a20336 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimestampOperand.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimestampOperand.java @@ -24,6 +24,8 @@ import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.transformation.dag.memory.LayerMemoryAssigner; +import org.apache.tsfile.utils.RamUsageEstimator; + import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -32,6 +34,9 @@ import java.util.Map; public class TimestampOperand extends LeafOperand { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TimeSeriesOperand.class); + public static final String TIMESTAMP_EXPRESSION_STRING = "Time"; public TimestampOperand() { @@ -82,4 +87,9 @@ public class TimestampOperand extends LeafOperand { protected void serialize(DataOutputStream stream) throws IOException { // do nothing } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java index 4a0ff5458cf..a17864ff3be 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction; import org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction; import org.apache.iotdb.commons.udf.service.UDFManagementService; import org.apache.iotdb.db.queryengine.common.NodeRef; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; @@ -37,6 +38,7 @@ import org.apache.iotdb.db.queryengine.transformation.dag.udf.UDTFInformationInf import org.apache.iotdb.udf.api.customizer.strategy.AccessStrategy; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -53,6 +55,9 @@ import java.util.stream.Collectors; public class FunctionExpression extends Expression { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(FunctionExpression.class); + private FunctionType functionType; private final String functionName; @@ -392,4 +397,25 @@ public class FunctionExpression extends Expression { Expression.serialize(expression, stream); } } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + RamUsageEstimator.sizeOf(functionName) + + RamUsageEstimator.sizeOf(parametersString) + + RamUsageEstimator.sizeOfMap(functionAttributes) + + (expressions == null + ? 0 + : expressions.stream() + .mapToLong(MemoryEstimationHelper::getEstimatedSizeOfAccountableObject) + .sum()) + + (paths == null + ? 0 + : paths.stream().mapToLong(MemoryEstimationHelper::getEstimatedSizeOfPartialPath).sum()) + + (countTimeExpressions == null + ? 0 + : countTimeExpressions.stream() + .mapToLong(MemoryEstimationHelper::getEstimatedSizeOfAccountableObject) + .sum()); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/CaseWhenThenExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/CaseWhenThenExpression.java index cd28e9f110e..142473af853 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/CaseWhenThenExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/CaseWhenThenExpression.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.expression.other; import org.apache.iotdb.db.queryengine.common.NodeRef; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.binary.WhenThenExpression; @@ -31,6 +32,7 @@ import org.apache.iotdb.db.queryengine.transformation.dag.udf.UDTFExecutor; import org.apache.commons.lang3.Validate; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -42,6 +44,9 @@ import java.util.List; import java.util.Map; public class CaseWhenThenExpression extends Expression { + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(CaseWhenThenExpression.class); protected List<WhenThenExpression> whenThenExpressions = new ArrayList<>(); protected Expression elseExpression; @@ -183,4 +188,15 @@ public class CaseWhenThenExpression extends Expression { public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) { return visitor.visitCaseWhenThenExpression(this, context); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(elseExpression) + + (whenThenExpressions == null + ? 0 + : whenThenExpressions.stream() + .mapToLong(MemoryEstimationHelper::getEstimatedSizeOfAccountableObject) + .sum()); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/GroupByTimeExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/GroupByTimeExpression.java index 21ed6187449..4ee3c174bc4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/GroupByTimeExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/GroupByTimeExpression.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.queryengine.transformation.dag.memory.LayerMemoryAssi import org.apache.iotdb.db.queryengine.transformation.dag.udf.UDTFExecutor; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.utils.TimeDuration; @@ -41,6 +42,9 @@ import java.util.Map; /** Only used for representing GROUP BY TIME filter. */ public class GroupByTimeExpression extends Expression { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(GroupByTimeExpression.class); + // [startTime, endTime] private final long startTime; private final long endTime; @@ -157,4 +161,9 @@ public class GroupByTimeExpression extends Expression { public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) { return visitor.visitGroupByTimeExpression(this, context); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/ternary/BetweenExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/ternary/BetweenExpression.java index 04025bcab62..00b307df340 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/ternary/BetweenExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/ternary/BetweenExpression.java @@ -21,10 +21,12 @@ package org.apache.iotdb.db.queryengine.plan.expression.ternary; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -32,6 +34,10 @@ import java.io.IOException; import java.nio.ByteBuffer; public class BetweenExpression extends TernaryExpression { + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TernaryExpression.class); + private final boolean isNotBetween; public boolean isNotBetween() { @@ -102,4 +108,12 @@ public class BetweenExpression extends TernaryExpression { public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) { return visitor.visitBetweenExpression(this, context); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(firstExpression) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(secondExpression) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(thirdExpression); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/InExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/InExpression.java index 5943b209ace..0ace411cd94 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/InExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/InExpression.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.expression.unary; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand; @@ -27,6 +28,7 @@ import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import javax.validation.constraints.NotNull; @@ -38,7 +40,8 @@ import java.util.Iterator; import java.util.LinkedHashSet; public class InExpression extends UnaryExpression { - + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(InExpression.class); private final boolean isNotIn; private final LinkedHashSet<String> values; @@ -138,4 +141,11 @@ public class InExpression extends UnaryExpression { public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) { return visitor.visitInExpression(this, context); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(expression) + + (values == null ? 0 : values.stream().mapToLong(RamUsageEstimator::sizeOf).sum()); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/IsNullExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/IsNullExpression.java index 75d55b242fd..9542b29e2da 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/IsNullExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/IsNullExpression.java @@ -19,10 +19,12 @@ package org.apache.iotdb.db.queryengine.plan.expression.unary; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -30,6 +32,9 @@ import java.io.IOException; import java.nio.ByteBuffer; public class IsNullExpression extends UnaryExpression { + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(IsNullExpression.class); private final boolean isNot; public IsNullExpression(Expression expression, boolean isNot) { @@ -77,4 +82,9 @@ public class IsNullExpression extends UnaryExpression { public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) { return visitor.visitIsNullExpression(this, context); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(expression); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LikeExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LikeExpression.java index fdc158a9f76..a83824ce68d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LikeExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LikeExpression.java @@ -19,10 +19,12 @@ package org.apache.iotdb.db.queryengine.plan.expression.unary; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -35,6 +37,9 @@ import static org.apache.tsfile.utils.RegexUtils.parseLikePatternToRegex; public class LikeExpression extends UnaryExpression { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(LikeExpression.class); + private final String patternString; private final Pattern pattern; @@ -107,4 +112,11 @@ public class LikeExpression extends UnaryExpression { public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) { return visitor.visitLikeExpression(this, context); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(expression) + + RamUsageEstimator.sizeOf(patternString); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LogicNotExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LogicNotExpression.java index 69719cfd415..9797376e545 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LogicNotExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LogicNotExpression.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.expression.unary; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand; @@ -27,10 +28,15 @@ import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; +import org.apache.tsfile.utils.RamUsageEstimator; + import java.nio.ByteBuffer; public class LogicNotExpression extends UnaryExpression { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(LogicNotExpression.class); + public LogicNotExpression(Expression expression) { super(expression); } @@ -64,4 +70,9 @@ public class LogicNotExpression extends UnaryExpression { public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) { return visitor.visitLogicNotExpression(this, context); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(expression); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/NegationExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/NegationExpression.java index 52df786b486..d496a05c261 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/NegationExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/NegationExpression.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.expression.unary; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand; @@ -27,10 +28,15 @@ import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; +import org.apache.tsfile.utils.RamUsageEstimator; + import java.nio.ByteBuffer; public class NegationExpression extends UnaryExpression { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(NegationExpression.class); + public NegationExpression(Expression expression) { super(expression); } @@ -70,4 +76,9 @@ public class NegationExpression extends UnaryExpression { public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) { return visitor.visitNegationExpression(this, context); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(expression); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/RegularExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/RegularExpression.java index 39a1baac949..4d517e219e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/RegularExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/RegularExpression.java @@ -19,11 +19,13 @@ package org.apache.iotdb.db.queryengine.plan.expression.unary; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; import org.apache.commons.lang3.Validate; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -35,6 +37,9 @@ import static org.apache.tsfile.utils.RegexUtils.compileRegex; public class RegularExpression extends UnaryExpression { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(RegularExpression.class); + private final String patternString; private final Pattern pattern; @@ -111,4 +116,11 @@ public class RegularExpression extends UnaryExpression { public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) { return visitor.visitRegularExpression(this, context); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(expression) + + RamUsageEstimator.sizeOf(patternString); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java index 7383e2784eb..fdbd9c52142 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.schema.view.LogicalViewSchema; import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils; import org.apache.iotdb.db.queryengine.plan.expression.Expression; @@ -34,6 +35,7 @@ import org.apache.iotdb.db.queryengine.plan.expression.visitor.CompleteMeasureme import org.apache.iotdb.db.schemaengine.schemaregion.view.visitor.TransformToExpressionVisitor; import org.apache.iotdb.db.utils.constant.SqlConstant; +import org.apache.commons.lang3.Validate; import org.apache.tsfile.write.schema.IMeasurementSchema; import java.util.ArrayList; @@ -43,20 +45,21 @@ import java.util.List; import java.util.stream.Collectors; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.cartesianProduct; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressions; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressionsWithMemoryCheck; import static org.apache.iotdb.db.utils.TypeInferenceUtils.bindTypeForBuiltinAggregationNonSeriesInputExpressions; import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME; -public class BindSchemaForExpressionVisitor extends CartesianProductVisitor<ISchemaTree> { +public class BindSchemaForExpressionVisitor + extends CartesianProductVisitor<BindSchemaForExpressionVisitor.Context> { @Override public List<Expression> visitFunctionExpression( - FunctionExpression functionExpression, ISchemaTree schemaTree) { + FunctionExpression functionExpression, Context context) { if (COUNT_TIME.equalsIgnoreCase(functionExpression.getFunctionName())) { List<Expression> usedExpressions = functionExpression.getExpressions().stream() - .flatMap(e -> process(e, schemaTree).stream()) + .flatMap(e -> process(e, context).stream()) .collect(Collectors.toList()); Expression countTimeExpression = @@ -73,7 +76,7 @@ public class BindSchemaForExpressionVisitor extends CartesianProductVisitor<ISch // to collect the produced expressions. List<List<Expression>> extendedExpressions = new ArrayList<>(); for (Expression originExpression : functionExpression.getExpressions()) { - List<Expression> actualExpressions = process(originExpression, schemaTree); + List<Expression> actualExpressions = process(originExpression, context); if (actualExpressions.isEmpty()) { // Let's ignore the eval of the function which has at least one non-existence series as // input. See IOTDB-1212: https://github.com/apache/iotdb/pull/3101 @@ -96,15 +99,16 @@ public class BindSchemaForExpressionVisitor extends CartesianProductVisitor<ISch List<List<Expression>> childExpressionsList = new ArrayList<>(); cartesianProduct(extendedExpressions, childExpressionsList, 0, new ArrayList<>()); - return reconstructFunctionExpressions(functionExpression, childExpressionsList); + return reconstructFunctionExpressionsWithMemoryCheck( + functionExpression, childExpressionsList, context.getQueryContext()); } @Override public List<Expression> visitTimeSeriesOperand( - TimeSeriesOperand timeSeriesOperand, ISchemaTree schemaTree) { + TimeSeriesOperand timeSeriesOperand, Context context) { PartialPath timeSeriesOperandPath = timeSeriesOperand.getPath(); List<MeasurementPath> actualPaths = - schemaTree.searchMeasurementPaths(timeSeriesOperandPath).left; + context.getSchemaTree().searchMeasurementPaths(timeSeriesOperandPath).left; // process logical view List<MeasurementPath> nonViewActualPaths = new ArrayList<>(); List<MeasurementPath> viewPaths = new ArrayList<>(); @@ -116,10 +120,11 @@ public class BindSchemaForExpressionVisitor extends CartesianProductVisitor<ISch } } List<Expression> reconstructTimeSeriesOperands = - ExpressionUtils.reconstructTimeSeriesOperands(timeSeriesOperand, nonViewActualPaths); + ExpressionUtils.reconstructTimeSeriesOperandsWithMemoryCheck( + timeSeriesOperand, nonViewActualPaths, context.getQueryContext()); // handle logical views for (MeasurementPath measurementPath : viewPaths) { - Expression replacedExpression = transformViewPath(measurementPath, schemaTree); + Expression replacedExpression = transformViewPath(measurementPath, context.getSchemaTree()); replacedExpression.setViewPath(measurementPath); reconstructTimeSeriesOperands.add(replacedExpression); } @@ -128,13 +133,12 @@ public class BindSchemaForExpressionVisitor extends CartesianProductVisitor<ISch @Override public List<Expression> visitTimeStampOperand( - TimestampOperand timestampOperand, ISchemaTree schemaTree) { + TimestampOperand timestampOperand, Context context) { return Collections.singletonList(timestampOperand); } @Override - public List<Expression> visitConstantOperand( - ConstantOperand constantOperand, ISchemaTree schemaTree) { + public List<Expression> visitConstantOperand(ConstantOperand constantOperand, Context context) { return Collections.singletonList(constantOperand); } @@ -152,4 +156,24 @@ public class BindSchemaForExpressionVisitor extends CartesianProductVisitor<ISch expression = new CompleteMeasurementSchemaVisitor().process(expression, schemaTree); return expression; } + + public static class Context implements QueryContextProvider { + private final ISchemaTree schemaTree; + private final MPPQueryContext queryContext; + + public Context(final ISchemaTree schemaTree, final MPPQueryContext queryContext) { + this.schemaTree = schemaTree; + Validate.notNull(queryContext, "QueryContext is null"); + this.queryContext = queryContext; + } + + public ISchemaTree getSchemaTree() { + return schemaTree; + } + + @Override + public MPPQueryContext getQueryContext() { + return queryContext; + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java index f9c9d7e7d0e..485a5ba3d4c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; @@ -32,6 +33,8 @@ import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand; import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; import org.apache.iotdb.db.utils.constant.SqlConstant; +import org.apache.commons.lang3.Validate; + import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; @@ -39,9 +42,9 @@ import java.util.List; import java.util.stream.Collectors; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.cartesianProduct; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructBinaryExpressions; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressions; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperands; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructBinaryExpressionsWithMemoryCheck; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressionsWithMemoryCheck; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperandsWithMemoryCheck; import static org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian.BindSchemaForExpressionVisitor.transformViewPath; import static org.apache.iotdb.db.utils.TypeInferenceUtils.bindTypeForBuiltinAggregationNonSeriesInputExpressions; import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME; @@ -61,7 +64,8 @@ public class BindSchemaForPredicateVisitor resultExpressions.addAll(rightExpressions); return resultExpressions; } - return reconstructBinaryExpressions(binaryExpression, leftExpressions, rightExpressions); + return reconstructBinaryExpressionsWithMemoryCheck( + binaryExpression, leftExpressions, rightExpressions, context.getQueryContext()); } @Override @@ -86,7 +90,11 @@ public class BindSchemaForPredicateVisitor extendedExpressions.add( process( suffixExpression, - new Context(context.getPrefixPaths(), context.getSchemaTree(), false))); + new Context( + context.getPrefixPaths(), + context.getSchemaTree(), + false, + context.getQueryContext()))); // We just process first input Expression of Count_IF, // keep other input Expressions as origin and bind Type @@ -99,7 +107,8 @@ public class BindSchemaForPredicateVisitor } List<List<Expression>> childExpressionsList = new ArrayList<>(); cartesianProduct(extendedExpressions, childExpressionsList, 0, new ArrayList<>()); - return reconstructFunctionExpressions(predicate, childExpressionsList); + return reconstructFunctionExpressionsWithMemoryCheck( + predicate, childExpressionsList, context.getQueryContext()); } @Override @@ -130,7 +139,8 @@ public class BindSchemaForPredicateVisitor } } List<Expression> reconstructTimeSeriesOperands = - reconstructTimeSeriesOperands(predicate, nonViewPathList); + reconstructTimeSeriesOperandsWithMemoryCheck( + predicate, nonViewPathList, context.getQueryContext()); for (MeasurementPath measurementPath : viewPathList) { Expression replacedExpression = transformViewPath(measurementPath, context.getSchemaTree()); replacedExpression.setViewPath(measurementPath); @@ -150,19 +160,27 @@ public class BindSchemaForPredicateVisitor return Collections.singletonList(constantOperand); } - public static class Context { + public static class Context implements QueryContextProvider { private final List<PartialPath> prefixPaths; private final ISchemaTree schemaTree; private final boolean isRoot; - public Context(List<PartialPath> prefixPaths, ISchemaTree schemaTree, boolean isRoot) { + private final MPPQueryContext queryContext; + + public Context( + final List<PartialPath> prefixPaths, + final ISchemaTree schemaTree, + final boolean isRoot, + final MPPQueryContext queryContext) { this.prefixPaths = prefixPaths; this.schemaTree = schemaTree; this.isRoot = isRoot; + Validate.notNull(queryContext, "QueryContext is null"); + this.queryContext = queryContext; } public Context notRootClone() { - return new Context(this.prefixPaths, this.schemaTree, false); + return new Context(this.prefixPaths, this.schemaTree, false, queryContext); } public List<PartialPath> getPrefixPaths() { @@ -176,5 +194,10 @@ public class BindSchemaForPredicateVisitor public boolean isRoot() { return isRoot; } + + @Override + public MPPQueryContext getQueryContext() { + return queryContext; + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/CartesianProductVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/CartesianProductVisitor.java index 0f8e7decc62..78a9b2ab974 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/CartesianProductVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/CartesianProductVisitor.java @@ -32,34 +32,39 @@ import java.util.Collections; import java.util.List; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.cartesianProduct; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructBinaryExpressions; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructBinaryExpressionsWithMemoryCheck; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructCaseWhenThenExpression; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTernaryExpressions; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructUnaryExpressions; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTernaryExpressionsWithMemoryCheck; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructUnaryExpressionsWithMemoryCheck; -public abstract class CartesianProductVisitor<C> +public abstract class CartesianProductVisitor<C extends QueryContextProvider> extends ExpressionAnalyzeVisitor<List<Expression>, C> { @Override public List<Expression> visitTernaryExpression(TernaryExpression ternaryExpression, C context) { List<List<Expression>> childResultsList = getResultsFromChild(ternaryExpression, context); - return reconstructTernaryExpressions( + return reconstructTernaryExpressionsWithMemoryCheck( ternaryExpression, childResultsList.get(0), childResultsList.get(1), - childResultsList.get(2)); + childResultsList.get(2), + context.getQueryContext()); } @Override public List<Expression> visitBinaryExpression(BinaryExpression binaryExpression, C context) { List<List<Expression>> childResultsList = getResultsFromChild(binaryExpression, context); - return reconstructBinaryExpressions( - binaryExpression, childResultsList.get(0), childResultsList.get(1)); + return reconstructBinaryExpressionsWithMemoryCheck( + binaryExpression, + childResultsList.get(0), + childResultsList.get(1), + context.getQueryContext()); } @Override public List<Expression> visitUnaryExpression(UnaryExpression unaryExpression, C context) { List<List<Expression>> childResultsList = getResultsFromChild(unaryExpression, context); - return reconstructUnaryExpressions(unaryExpression, childResultsList.get(0)); + return reconstructUnaryExpressionsWithMemoryCheck( + unaryExpression, childResultsList.get(0), context.getQueryContext()); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForExpressionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForExpressionVisitor.java index 25963b4b4f7..fc0bdb6f669 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForExpressionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForExpressionVisitor.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils; import org.apache.iotdb.db.queryengine.plan.expression.Expression; @@ -31,6 +32,8 @@ import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand; import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; import org.apache.iotdb.db.utils.constant.SqlConstant; +import org.apache.commons.lang3.Validate; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -39,7 +42,7 @@ import java.util.List; import java.util.stream.Collectors; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.cartesianProduct; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressions; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressionsWithMemoryCheck; import static org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian.BindSchemaForExpressionVisitor.transformViewPath; import static org.apache.iotdb.db.utils.TypeInferenceUtils.bindTypeForBuiltinAggregationNonSeriesInputExpressions; import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME; @@ -82,7 +85,8 @@ public class ConcatDeviceAndBindSchemaForExpressionVisitor List<List<Expression>> childExpressionsList = new ArrayList<>(); cartesianProduct(extendedExpressions, childExpressionsList, 0, new ArrayList<>()); - return reconstructFunctionExpressions(functionExpression, childExpressionsList); + return reconstructFunctionExpressionsWithMemoryCheck( + functionExpression, childExpressionsList, context.getQueryContext()); } @Override @@ -108,7 +112,8 @@ public class ConcatDeviceAndBindSchemaForExpressionVisitor } } List<Expression> reconstructTimeSeriesOperands = - ExpressionUtils.reconstructTimeSeriesOperands(timeSeriesOperand, nonViewActualPaths); + ExpressionUtils.reconstructTimeSeriesOperandsWithMemoryCheck( + timeSeriesOperand, nonViewActualPaths, context.getQueryContext()); // handle logical views for (MeasurementPath measurementPath : viewPaths) { Expression replacedExpression = transformViewPath(measurementPath, context.getSchemaTree()); @@ -134,13 +139,20 @@ public class ConcatDeviceAndBindSchemaForExpressionVisitor return Collections.singletonList(constantOperand); } - public static class Context { + public static class Context implements QueryContextProvider { private final PartialPath devicePath; private final ISchemaTree schemaTree; - public Context(PartialPath devicePath, ISchemaTree schemaTree) { + private final MPPQueryContext queryContext; + + public Context( + final PartialPath devicePath, + final ISchemaTree schemaTree, + final MPPQueryContext queryContext) { this.devicePath = devicePath; this.schemaTree = schemaTree; + Validate.notNull(queryContext, "QueryContext is null"); + this.queryContext = queryContext; } public PartialPath getDevicePath() { @@ -150,5 +162,10 @@ public class ConcatDeviceAndBindSchemaForExpressionVisitor public ISchemaTree getSchemaTree() { return schemaTree; } + + @Override + public MPPQueryContext getQueryContext() { + return queryContext; + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForPredicateVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForPredicateVisitor.java index 7284568f34f..372c861481a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForPredicateVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForPredicateVisitor.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand; @@ -30,13 +31,15 @@ import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand; import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; +import org.apache.commons.lang3.Validate; + import java.util.ArrayList; import java.util.Collections; import java.util.List; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.cartesianProduct; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressions; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperands; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressionsWithMemoryCheck; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperandsWithMemoryCheck; import static org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian.BindSchemaForExpressionVisitor.transformViewPath; public class ConcatDeviceAndBindSchemaForPredicateVisitor @@ -53,7 +56,8 @@ public class ConcatDeviceAndBindSchemaForPredicateVisitor } List<List<Expression>> childExpressionsList = new ArrayList<>(); cartesianProduct(extendedExpressions, childExpressionsList, 0, new ArrayList<>()); - return reconstructFunctionExpressions(predicate, childExpressionsList); + return reconstructFunctionExpressionsWithMemoryCheck( + predicate, childExpressionsList, context.getQueryContext()); } @Override @@ -77,7 +81,8 @@ public class ConcatDeviceAndBindSchemaForPredicateVisitor } List<Expression> reconstructTimeSeriesOperands = - reconstructTimeSeriesOperands(predicate, nonViewPathList); + reconstructTimeSeriesOperandsWithMemoryCheck( + predicate, nonViewPathList, context.getQueryContext()); for (MeasurementPath measurementPath : viewPathList) { Expression replacedExpression = transformViewPath(measurementPath, context.getSchemaTree()); if (!(replacedExpression instanceof TimeSeriesOperand)) { @@ -102,15 +107,23 @@ public class ConcatDeviceAndBindSchemaForPredicateVisitor return Collections.singletonList(constantOperand); } - public static class Context { + public static class Context implements QueryContextProvider { private final PartialPath devicePath; private final ISchemaTree schemaTree; private final boolean isWhere; - public Context(PartialPath devicePath, ISchemaTree schemaTree, boolean isWhere) { + private final MPPQueryContext queryContext; + + public Context( + final PartialPath devicePath, + final ISchemaTree schemaTree, + final boolean isWhere, + final MPPQueryContext queryContext) { this.devicePath = devicePath; this.schemaTree = schemaTree; this.isWhere = isWhere; + Validate.notNull(queryContext, "QueryContext is null"); + this.queryContext = queryContext; } public PartialPath getDevicePath() { @@ -124,5 +137,10 @@ public class ConcatDeviceAndBindSchemaForPredicateVisitor public boolean isWhere() { return isWhere; } + + @Override + public MPPQueryContext getQueryContext() { + return queryContext; + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java index 3f53190c00d..7c852083e2b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternTree; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; @@ -28,6 +29,7 @@ import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand; import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; import org.apache.iotdb.db.utils.constant.SqlConstant; +import org.apache.commons.lang3.Validate; import org.apache.tsfile.common.constant.TsFileConstant; import java.util.ArrayList; @@ -35,8 +37,8 @@ import java.util.Collections; import java.util.List; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.cartesianProduct; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressions; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperands; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressionsWithMemoryCheck; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperandsWithMemoryCheck; public class ConcatExpressionWithSuffixPathsVisitor extends CartesianProductVisitor<ConcatExpressionWithSuffixPathsVisitor.Context> { @@ -60,7 +62,8 @@ public class ConcatExpressionWithSuffixPathsVisitor List<List<Expression>> childExpressionsList = new ArrayList<>(); cartesianProduct(extendedExpressions, childExpressionsList, 0, new ArrayList<>()); - return reconstructFunctionExpressions(functionExpression, childExpressionsList); + return reconstructFunctionExpressionsWithMemoryCheck( + functionExpression, childExpressionsList, context.getQueryContext()); } @Override @@ -78,7 +81,8 @@ public class ConcatExpressionWithSuffixPathsVisitor actualPaths.add(concatPath); } } - return reconstructTimeSeriesOperands(timeSeriesOperand, actualPaths); + return reconstructTimeSeriesOperandsWithMemoryCheck( + timeSeriesOperand, actualPaths, context.getQueryContext()); } @Override @@ -92,13 +96,20 @@ public class ConcatExpressionWithSuffixPathsVisitor return Collections.singletonList(constantOperand); } - public static class Context { + public static class Context implements QueryContextProvider { private final List<PartialPath> prefixPaths; private final PathPatternTree patternTree; - public Context(List<PartialPath> prefixPaths, PathPatternTree patternTree) { + private final MPPQueryContext queryContext; + + public Context( + final List<PartialPath> prefixPaths, + final PathPatternTree patternTree, + final MPPQueryContext queryContext) { this.prefixPaths = prefixPaths; this.patternTree = patternTree; + Validate.notNull(queryContext, "QueryContext is null"); + this.queryContext = queryContext; } public List<PartialPath> getPrefixPaths() { @@ -108,5 +119,10 @@ public class ConcatExpressionWithSuffixPathsVisitor public PathPatternTree getPatternTree() { return patternTree; } + + @Override + public MPPQueryContext getQueryContext() { + return queryContext; + } } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughExceptionTest.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/QueryContextProvider.java similarity index 61% rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughExceptionTest.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/QueryContextProvider.java index dbf5046f675..2eb0ec0b4e7 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughExceptionTest.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/QueryContextProvider.java @@ -17,21 +17,10 @@ * under the License. */ -package org.apache.iotdb.db.queryengine.exception; +package org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian; -import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class MemoryNotEnoughExceptionTest { - - @Test - public void testMemoryNotEnoughExceptionStatusCode() { - MemoryNotEnoughException e = new MemoryNotEnoughException("test"); - assertEquals(TSStatusCode.QUOTA_MEM_QUERY_NOT_ENOUGH.getStatusCode(), e.getErrorCode()); - assertTrue(e.isUserException()); - } +public interface QueryContextProvider { + MPPQueryContext getQueryContext(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java index cf4b3461955..5a58f6c2662 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils; import org.apache.iotdb.db.queryengine.plan.expression.Expression; @@ -60,6 +61,7 @@ import org.apache.iotdb.db.schemaengine.schemaregion.utils.MetaUtils; import org.apache.iotdb.db.utils.SchemaUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.Validate; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.schema.IMeasurementSchema; @@ -407,19 +409,31 @@ public class AggregationPushDown implements PlanOptimizer { GroupByTimeParameter groupByTimeParameter, RewriterContext context) { if (selectPath instanceof MeasurementPath) { // non-aligned series - return new SeriesAggregationScanNode( - context.genPlanNodeId(), - (MeasurementPath) selectPath, - aggregationDescriptorList, - scanOrder, - groupByTimeParameter); + SeriesAggregationSourceNode node = + new SeriesAggregationScanNode( + context.genPlanNodeId(), + (MeasurementPath) selectPath, + aggregationDescriptorList, + scanOrder, + groupByTimeParameter); + context + .getContext() + .reserveMemoryForFrontEnd( + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(node)); + return node; } else if (selectPath instanceof AlignedPath) { // aligned series - return new AlignedSeriesAggregationScanNode( - context.genPlanNodeId(), - (AlignedPath) selectPath, - aggregationDescriptorList, - scanOrder, - groupByTimeParameter); + SeriesAggregationSourceNode node = + new AlignedSeriesAggregationScanNode( + context.genPlanNodeId(), + (AlignedPath) selectPath, + aggregationDescriptorList, + scanOrder, + groupByTimeParameter); + context + .getContext() + .reserveMemoryForFrontEnd( + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(node)); + return node; } else { throw new IllegalArgumentException("unexpected path type"); } @@ -457,6 +471,7 @@ public class AggregationPushDown implements PlanOptimizer { public RewriterContext(Analysis analysis, MPPQueryContext context, boolean isAlignByDevice) { this.analysis = analysis; + Validate.notNull(context, "Query context cannot be null."); this.context = context; this.isAlignByDevice = isAlignByDevice; } @@ -473,6 +488,10 @@ public class AggregationPushDown implements PlanOptimizer { this.curDevice = curDevice; } + public MPPQueryContext getContext() { + return context; + } + public Set<Expression> getAggregationExpressions() { if (isAlignByDevice) { return analysis.getDeviceToAggregationExpressions().get(curDevice); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java index 14fd9be4b05..6a8ba336588 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java @@ -213,7 +213,27 @@ public class LocalExecutionPlanner { } } - public synchronized void releaseToFreeMemoryForOperators(long memoryInBytes) { + public synchronized void reserveMemoryForQueryFrontEnd( + final long memoryInBytes, final long reservedBytes, final String queryId) { + if (memoryInBytes > freeMemoryForOperators) { + throw new MemoryNotEnoughException( + String.format( + "There is not enough memory for planning-stage of Query %s, " + + "current remaining free memory is %dB, " + + "estimated memory usage is %dB, reserved memory for FE of this query in total is %dB", + queryId, freeMemoryForOperators, memoryInBytes, reservedBytes)); + } else { + freeMemoryForOperators -= memoryInBytes; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "[ConsumeMemory] consume: {}, current remaining memory: {}", + memoryInBytes, + freeMemoryForOperators); + } + } + } + + public synchronized void releaseToFreeMemoryForOperators(final long memoryInBytes) { freeMemoryForOperators += memoryInBytes; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index b1786c85112..9559b29e976 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -31,6 +31,7 @@ import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.TimeseriesSchemaInfo; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory; import org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; @@ -82,6 +83,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeri import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesSourceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; @@ -138,6 +140,7 @@ public class LogicalPlanBuilder { public LogicalPlanBuilder(Analysis analysis, MPPQueryContext context) { this.analysis = analysis; + Validate.notNull(context, "Query context cannot be null"); this.context = context; } @@ -195,27 +198,26 @@ public class LogicalPlanBuilder { for (PartialPath path : groupedPaths) { if (path instanceof MeasurementPath) { // non-aligned series - SeriesScanNode seriesScanNode = - new SeriesScanNode( - context.getQueryId().genPlanNodeId(), - (MeasurementPath) path, - scanOrder, - limit, - offset, - null); - sourceNodeList.add(seriesScanNode); + sourceNodeList.add( + reserveMemoryForSeriesSourceNode( + new SeriesScanNode( + context.getQueryId().genPlanNodeId(), + (MeasurementPath) path, + scanOrder, + limit, + offset, + null))); } else if (path instanceof AlignedPath) { - // aligned series - AlignedSeriesScanNode alignedSeriesScanNode = - new AlignedSeriesScanNode( - context.getQueryId().genPlanNodeId(), - (AlignedPath) path, - scanOrder, - limit, - offset, - null, - lastLevelUseWildcard); - sourceNodeList.add(alignedSeriesScanNode); + sourceNodeList.add( + reserveMemoryForSeriesSourceNode( + new AlignedSeriesScanNode( + context.getQueryId().genPlanNodeId(), + (AlignedPath) path, + scanOrder, + limit, + offset, + null, + lastLevelUseWildcard))); } else { throw new IllegalArgumentException("Unexpected path type"); } @@ -274,14 +276,16 @@ public class LogicalPlanBuilder { if (selectedPath.isUnderAlignedEntity()) { // aligned series sourceNodeList.add( - new AlignedLastQueryScanNode( - context.getQueryId().genPlanNodeId(), - new AlignedPath(selectedPath), - outputViewPath)); + reserveMemoryForSeriesSourceNode( + new AlignedLastQueryScanNode( + context.getQueryId().genPlanNodeId(), + new AlignedPath(selectedPath), + outputViewPath))); } else { // non-aligned series sourceNodeList.add( - new LastQueryScanNode( - context.getQueryId().genPlanNodeId(), selectedPath, outputViewPath)); + reserveMemoryForSeriesSourceNode( + new LastQueryScanNode( + context.getQueryId().genPlanNodeId(), selectedPath, outputViewPath))); } } } else { @@ -296,15 +300,18 @@ public class LogicalPlanBuilder { alignedPath.addMeasurement(measurementPath); } sourceNodeList.add( - new AlignedLastQueryScanNode( - context.getQueryId().genPlanNodeId(), alignedPath, null)); + reserveMemoryForSeriesSourceNode( + new AlignedLastQueryScanNode( + context.getQueryId().genPlanNodeId(), alignedPath, null))); } else { // non-aligned series for (Expression sourceExpression : measurementToExpressionsOfDevice.values()) { MeasurementPath selectedPath = (MeasurementPath) ((TimeSeriesOperand) sourceExpression).getPath(); sourceNodeList.add( - new LastQueryScanNode(context.getQueryId().genPlanNodeId(), selectedPath, null)); + reserveMemoryForSeriesSourceNode( + new LastQueryScanNode( + context.getQueryId().genPlanNodeId(), selectedPath, null))); } } } @@ -1356,4 +1363,15 @@ public class LogicalPlanBuilder { this.root = timeseriesRegionScanNode; return this; } + + /** + * There could be a lot of SeriesSourceNodes if there are too many series involved in one query. + * We need to check the memory used by SeriesSourceNodes.(Number of other PlanNodes are rather + * small compared to SourceNodes and could be safely ignored for now.) + */ + private PlanNode reserveMemoryForSeriesSourceNode(final SeriesSourceNode sourceNode) { + this.context.reserveMemoryForFrontEnd( + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceNode)); + return sourceNode; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java index 6c49e4478b8..c0f66e46c6c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.plan.planner.distribution; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.expression.Expression; +import org.apache.commons.lang3.Validate; import org.apache.tsfile.read.filter.basic.Filter; import java.util.List; @@ -43,6 +44,7 @@ public class DistributionPlanContext { protected DistributionPlanContext(MPPQueryContext queryContext) { this.isRoot = true; + Validate.notNull(queryContext, "Query context cannot be null"); this.queryContext = queryContext; } @@ -90,4 +92,8 @@ public class DistributionPlanContext { public boolean isOneSeriesInMultiRegion() { return oneSeriesInMultiRegion; } + + public MPPQueryContext getQueryContext() { + return queryContext; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java index 3ee8167c662..cae6acfaf63 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.commons.schema.SchemaConstant; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; @@ -799,6 +800,10 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> SeriesSourceNode split = (SeriesSourceNode) node.clone(); split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); split.setRegionReplicaSet(dataRegion); + context + .getQueryContext() + .reserveMemoryForFrontEnd( + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(split)); ret.add(split); } return ret; @@ -858,6 +863,10 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> split.setAggregationDescriptorList(leafAggDescriptorList); split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); split.setRegionReplicaSet(dataRegion); + context + .getQueryContext() + .reserveMemoryForFrontEnd( + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(split)); aggregationNode.addChild(split); } return Collections.singletonList(aggregationNode); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java index 764aaf627f7..c00e7ed81ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; @@ -30,6 +31,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.eclipse.jetty.util.StringUtil; @@ -43,6 +45,9 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; public class AlignedLastQueryScanNode extends LastSeriesSourceNode { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(AlignedLastQueryScanNode.class); + // The path of the target series which will be scanned. private final AlignedPath seriesPath; @@ -229,4 +234,12 @@ public class AlignedLastQueryScanNode extends LastSeriesSourceNode { public PartialPath getPartitionPath() { return getSeriesPath(); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id) + + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(seriesPath) + + RamUsageEstimator.sizeOf(outputViewPath); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java index 2b69cdb0bc4..68a284c018d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; @@ -35,6 +36,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimePa import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import javax.annotation.Nullable; @@ -47,6 +49,8 @@ import java.util.List; import java.util.Objects; public class AlignedSeriesAggregationScanNode extends SeriesAggregationSourceNode { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(AlignedSeriesAggregationScanNode.class); // The paths of the target series which will be aggregated. private AlignedPath alignedPath; @@ -294,4 +298,11 @@ public class AlignedSeriesAggregationScanNode extends SeriesAggregationSourceNod this.getAggregationDescriptorList(), PlanNodeUtil.printRegionReplicaSet(getRegionReplicaSet())); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id) + + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(alignedPath); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java index abf9a70a09f..327899e4bf6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; @@ -33,6 +34,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import javax.annotation.Nullable; @@ -46,6 +48,9 @@ import java.util.Objects; public class AlignedSeriesScanNode extends SeriesScanSourceNode { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(AlignedSeriesScanNode.class); + // The paths of the target series which will be scanned. private final AlignedPath alignedPath; @@ -260,4 +265,11 @@ public class AlignedSeriesScanNode extends SeriesScanSourceNode { public PartialPath getPartitionPath() { return getAlignedPath(); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id) + + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(alignedPath); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java index 6c9cbaa1adb..8d4f8291a62 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; @@ -30,6 +31,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.eclipse.jetty.util.StringUtil; @@ -42,6 +44,9 @@ import java.util.concurrent.atomic.AtomicInteger; public class LastQueryScanNode extends LastSeriesSourceNode { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(LastQueryScanNode.class); + public static final List<String> LAST_QUERY_HEADER_COLUMNS = ImmutableList.of( ColumnHeaderConstant.TIMESERIES, @@ -229,4 +234,12 @@ public class LastQueryScanNode extends LastSeriesSourceNode { return outputViewPath; } } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id) + + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(seriesPath) + + RamUsageEstimator.sizeOf(outputViewPath); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java index 8c3bcb6ec61..01e801cad22 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; @@ -34,6 +35,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimePa import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import javax.annotation.Nullable; @@ -62,6 +64,9 @@ import java.util.Objects; */ public class SeriesAggregationScanNode extends SeriesAggregationSourceNode { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(SeriesAggregationScanNode.class); + // The path of the target series which will be aggregated. private final MeasurementPath seriesPath; @@ -297,4 +302,11 @@ public class SeriesAggregationScanNode extends SeriesAggregationSourceNode { this.getAggregationDescriptorList(), PlanNodeUtil.printRegionReplicaSet(this.getRegionReplicaSet())); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id) + + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(seriesPath); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java index 3bd1087f624..587d1ac248c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; @@ -31,6 +32,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import javax.annotation.Nullable; @@ -50,6 +52,9 @@ import java.util.Objects; */ public class SeriesScanNode extends SeriesScanSourceNode { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(SeriesScanNode.class); + // The path of the target series which will be scanned. private final MeasurementPath seriesPath; @@ -195,4 +200,11 @@ public class SeriesScanNode extends SeriesScanSourceNode { public PartialPath getPartitionPath() { return getSeriesPath(); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id) + + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(seriesPath); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesSourceNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesSourceNode.java index d5b9c64f321..c2e052e76b7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesSourceNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesSourceNode.java @@ -23,7 +23,9 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; -public abstract class SeriesSourceNode extends SourceNode { +import org.apache.tsfile.utils.Accountable; + +public abstract class SeriesSourceNode extends SourceNode implements Accountable { protected SeriesSourceNode(PlanNodeId id) { super(id); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java index 837e4b7ef03..e698ba8487e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.protocol.thrift.OperationType; +import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -147,6 +148,8 @@ public class ErrorHandlingUtils { ((IoTDBException) t.getCause()).getErrorCode(), rootCause.getMessage()); } return RpcUtils.getStatus(TSStatusCode.SEMANTIC_ERROR, rootCause.getMessage()); + } else if (t instanceof MemoryNotEnoughException) { + return RpcUtils.getStatus(TSStatusCode.QUOTA_MEM_QUERY_NOT_ENOUGH, rootCause.getMessage()); } if (t instanceof RuntimeException && rootCause instanceof IoTDBException) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzerTest.java index a4f36c14dd0..1cd47c695d3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzerTest.java @@ -22,6 +22,8 @@ package org.apache.iotdb.db.queryengine.plan.analyze; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternTree; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.junit.Test; @@ -55,7 +57,8 @@ public class ExpressionAnalyzerTest { and(gt(timeSeries("s1"), intValue("1")), gt(timeSeries("s2"), intValue("1"))), prefixPaths, fakeSchemaTree, - true)); + true, + new MPPQueryContext(new QueryId("test")))); assertEquals( Arrays.asList( @@ -79,6 +82,7 @@ public class ExpressionAnalyzerTest { count(and(gt(timeSeries("s1"), intValue("1")), gt(timeSeries("s2"), intValue("1")))), prefixPaths, fakeSchemaTree, - true)); + true, + new MPPQueryContext(new QueryId("test")))); } }
