This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 462c9004a55 Introducing a memory control mechanism during the query
planning stage
462c9004a55 is described below
commit 462c9004a55c99774d8eb69494485f8cae3ceb56
Author: Liao Lanyu <[email protected]>
AuthorDate: Fri May 24 08:34:40 2024 +0800
Introducing a memory control mechanism during the query planning stage
---
.../db/queryengine/common/MPPQueryContext.java | 60 ++++++++
.../exception/MemoryNotEnoughException.java | 7 +-
.../execution/MemoryEstimationHelper.java | 6 +-
.../iotdb/db/queryengine/plan/Coordinator.java | 3 +
.../queryengine/plan/analyze/AnalyzeVisitor.java | 154 ++++++++++++++-------
.../plan/analyze/ConcatPathRewriter.java | 33 +++--
.../plan/analyze/ExpressionAnalyzer.java | 42 ++++--
.../queryengine/plan/analyze/ExpressionUtils.java | 66 ++++++---
.../queryengine/plan/analyze/TemplatedAnalyze.java | 12 +-
.../queryengine/plan/execution/QueryExecution.java | 3 +
.../db/queryengine/plan/expression/Expression.java | 3 +-
.../plan/expression/binary/BinaryExpression.java | 12 ++
.../plan/expression/leaf/ConstantOperand.java | 9 ++
.../plan/expression/leaf/NullOperand.java | 10 ++
.../plan/expression/leaf/TimeSeriesOperand.java | 10 ++
.../plan/expression/leaf/TimestampOperand.java | 10 ++
.../plan/expression/multi/FunctionExpression.java | 26 ++++
.../expression/other/CaseWhenThenExpression.java | 16 +++
.../expression/other/GroupByTimeExpression.java | 9 ++
.../plan/expression/ternary/BetweenExpression.java | 14 ++
.../plan/expression/unary/InExpression.java | 12 +-
.../plan/expression/unary/IsNullExpression.java | 10 ++
.../plan/expression/unary/LikeExpression.java | 12 ++
.../plan/expression/unary/LogicNotExpression.java | 11 ++
.../plan/expression/unary/NegationExpression.java | 11 ++
.../plan/expression/unary/RegularExpression.java | 12 ++
.../cartesian/BindSchemaForExpressionVisitor.java | 50 +++++--
.../cartesian/BindSchemaForPredicateVisitor.java | 43 ++++--
.../visitor/cartesian/CartesianProductVisitor.java | 23 +--
...catDeviceAndBindSchemaForExpressionVisitor.java | 27 +++-
...ncatDeviceAndBindSchemaForPredicateVisitor.java | 30 +++-
.../ConcatExpressionWithSuffixPathsVisitor.java | 28 +++-
.../visitor/cartesian/QueryContextProvider.java} | 19 +--
.../plan/optimization/AggregationPushDown.java | 91 ++++++++++--
.../plan/planner/LocalExecutionPlanner.java | 22 ++-
.../plan/planner/LogicalPlanBuilder.java | 76 ++++++----
.../distribution/DistributionPlanContext.java | 6 +
.../plan/planner/distribution/SourceRewriter.java | 9 ++
.../plan/node/source/AlignedLastQueryScanNode.java | 13 ++
.../source/AlignedSeriesAggregationScanNode.java | 11 ++
.../plan/node/source/AlignedSeriesScanNode.java | 12 ++
.../plan/node/source/LastQueryScanNode.java | 13 ++
.../node/source/SeriesAggregationScanNode.java | 12 ++
.../planner/plan/node/source/SeriesScanNode.java | 12 ++
.../planner/plan/node/source/SeriesSourceNode.java | 4 +-
.../apache/iotdb/db/utils/ErrorHandlingUtils.java | 3 +
.../plan/analyze/ExpressionAnalyzerTest.java | 8 +-
47 files changed, 872 insertions(+), 213 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 1cc2f8663da..3a42ee805b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.queryengine.statistics.QueryPlanStatistics;
import org.apache.tsfile.read.filter.basic.Filter;
@@ -75,6 +76,19 @@ public class MPPQueryContext {
QueryPlanStatistics queryPlanStatistics = null;
+ // To avoid query front-end from consuming too much memory, it needs to
reserve memory when
+ // constructing some Expression and PlanNode.
+ private long reservedBytesInTotalForFrontEnd = 0;
+
+ private long bytesToBeReservedForFrontEnd = 0;
+
+ // To avoid reserving memory too frequently, we choose to do it in batches.
This is the lower
+ // bound
+ // for each batch.
+ private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L;
+
+ private final LocalExecutionPlanner LOCAL_EXECUTION_PLANNER =
LocalExecutionPlanner.getInstance();
+
public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
this.endPointBlackList = new LinkedList<>();
@@ -113,6 +127,7 @@ public class MPPQueryContext {
public void prepareForRetry() {
this.initResultNodeContext();
+ this.releaseMemoryForFrontEnd();
}
private void initResultNodeContext() {
@@ -290,4 +305,49 @@ public class MPPQueryContext {
}
queryPlanStatistics.setLogicalOptimizationCost(logicalOptimizeCost);
}
+
+ // region =========== FE memory related, make sure its not called
concurrently ===========
+
+ /**
+ * This method does not require concurrency control because the query plan
is generated in a
+ * single-threaded manner.
+ */
+ public void reserveMemoryForFrontEnd(final long bytes) {
+ this.bytesToBeReservedForFrontEnd += bytes;
+ if (this.bytesToBeReservedForFrontEnd >= MEMORY_BATCH_THRESHOLD) {
+ reserveMemoryForFrontEndImmediately();
+ }
+ }
+
+ public void reserveMemoryForFrontEndImmediately() {
+ if (bytesToBeReservedForFrontEnd != 0) {
+ LOCAL_EXECUTION_PLANNER.reserveMemoryForQueryFrontEnd(
+ bytesToBeReservedForFrontEnd, reservedBytesInTotalForFrontEnd,
queryId.getId());
+ this.reservedBytesInTotalForFrontEnd += bytesToBeReservedForFrontEnd;
+ this.bytesToBeReservedForFrontEnd = 0;
+ }
+ }
+
+ public void releaseMemoryForFrontEnd() {
+ if (reservedBytesInTotalForFrontEnd != 0) {
+
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotalForFrontEnd);
+ reservedBytesInTotalForFrontEnd = 0;
+ }
+ }
+
+ public void releaseMemoryForFrontEnd(final long bytes) {
+ if (bytes != 0) {
+ long bytesToRelease;
+ if (bytes <= bytesToBeReservedForFrontEnd) {
+ bytesToBeReservedForFrontEnd -= bytes;
+ } else {
+ bytesToRelease = bytes - bytesToBeReservedForFrontEnd;
+ bytesToBeReservedForFrontEnd = 0;
+
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytesToRelease);
+ reservedBytesInTotalForFrontEnd -= bytesToRelease;
+ }
+ }
+ }
+
+ // endregion
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughException.java
index 5310a45db97..c0911254cb7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughException.java
@@ -19,12 +19,9 @@
package org.apache.iotdb.db.queryengine.exception;
-import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-public class MemoryNotEnoughException extends IoTDBException {
+public class MemoryNotEnoughException extends RuntimeException {
public MemoryNotEnoughException(String message) {
- super(message, TSStatusCode.QUOTA_MEM_QUERY_NOT_ENOUGH.getStatusCode(),
true);
+ super(message);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
index 85f13bfb779..a18e2dbc58b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
@@ -76,8 +76,10 @@ public class MemoryEstimationHelper {
totalSize += MEASUREMENT_PATH_INSTANCE_SIZE;
MeasurementPath measurementPath = (MeasurementPath) partialPath;
totalSize +=
RamUsageEstimator.sizeOf(measurementPath.getMeasurementAlias());
- totalSize +=
-
RamUsageEstimator.sizeOf(measurementPath.getMeasurementSchema().getMeasurementId());
+ if (measurementPath.getMeasurementSchema() != null) {
+ totalSize +=
+
RamUsageEstimator.sizeOf(measurementPath.getMeasurementSchema().getMeasurementId());
+ }
} else {
totalSize += PARTIAL_PATH_INSTANCE_SIZE;
totalSize += RamUsageEstimator.sizeOf(partialPath.getMeasurement());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 3b406bdb48c..65dcd830195 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -141,6 +141,9 @@ public class Coordinator {
}
return result;
} finally {
+ if (queryContext != null) {
+ queryContext.releaseMemoryForFrontEnd();
+ }
if (queryContext != null &&
!queryContext.getAcquiredLockNumMap().isEmpty()) {
Map<SchemaLockType, Integer> lockMap =
queryContext.getAcquiredLockNumMap();
for (Map.Entry<SchemaLockType, Integer> entry : lockMap.entrySet()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 15d05e3430b..3ed7bbfd169 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -291,20 +291,21 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
deviceList = pushDownLimitOffsetInGroupByTimeForDevice(deviceList,
queryStatement);
}
- outputExpressions = analyzeSelect(analysis, queryStatement,
schemaTree, deviceList);
+ outputExpressions =
+ analyzeSelect(analysis, queryStatement, schemaTree, deviceList,
context);
if (outputExpressions.isEmpty()) {
return finishQuery(queryStatement, analysis);
}
- analyzeDeviceToWhere(analysis, queryStatement, schemaTree, deviceList);
+ analyzeDeviceToWhere(analysis, queryStatement, schemaTree, deviceList,
context);
if (deviceList.isEmpty()) {
return finishQuery(queryStatement, analysis, outputExpressions);
}
analysis.setDeviceList(deviceList);
- analyzeDeviceToGroupBy(analysis, queryStatement, schemaTree,
deviceList);
- analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree,
deviceList);
- analyzeHaving(analysis, queryStatement, schemaTree, deviceList);
+ analyzeDeviceToGroupBy(analysis, queryStatement, schemaTree,
deviceList, context);
+ analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree,
deviceList, context);
+ analyzeHaving(analysis, queryStatement, schemaTree, deviceList,
context);
analyzeDeviceToAggregation(analysis, queryStatement);
analyzeDeviceToSourceTransform(analysis, queryStatement);
@@ -321,22 +322,25 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
new
GroupByLevelHelper(queryStatement.getGroupByLevelComponent().getLevels());
outputExpressions =
- analyzeGroupByLevelSelect(analysis, queryStatement, schemaTree,
groupByLevelHelper);
+ analyzeGroupByLevelSelect(
+ analysis, queryStatement, schemaTree, groupByLevelHelper,
context);
if (outputExpressions.isEmpty()) {
return finishQuery(queryStatement, analysis);
}
analysis.setOutputExpressions(outputExpressions);
setSelectExpressions(analysis, queryStatement, outputExpressions);
- analyzeGroupByLevelHaving(analysis, queryStatement, schemaTree,
groupByLevelHelper);
+ analyzeGroupByLevelHaving(
+ analysis, queryStatement, schemaTree, groupByLevelHelper,
context);
- analyzeGroupByLevelOrderBy(analysis, queryStatement, schemaTree,
groupByLevelHelper);
+ analyzeGroupByLevelOrderBy(
+ analysis, queryStatement, schemaTree, groupByLevelHelper,
context);
checkDataTypeConsistencyInGroupByLevel(
analysis, groupByLevelHelper.getGroupByLevelExpressions());
analysis.setCrossGroupByExpressions(groupByLevelHelper.getGroupByLevelExpressions());
} else {
- outputExpressions = analyzeSelect(analysis, queryStatement,
schemaTree);
+ outputExpressions = analyzeSelect(analysis, queryStatement,
schemaTree, context);
analyzeGroupByTag(analysis, queryStatement, outputExpressions);
@@ -346,17 +350,17 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
analysis.setOutputExpressions(outputExpressions);
setSelectExpressions(analysis, queryStatement, outputExpressions);
- analyzeHaving(analysis, queryStatement, schemaTree);
+ analyzeHaving(analysis, queryStatement, schemaTree, context);
- analyzeOrderBy(analysis, queryStatement, schemaTree);
+ analyzeOrderBy(analysis, queryStatement, schemaTree, context);
}
// analyze aggregation
analyzeAggregation(analysis, queryStatement);
// analyze aggregation input
- analyzeGroupBy(analysis, queryStatement, schemaTree);
- analyzeWhere(analysis, queryStatement, schemaTree);
+ analyzeGroupBy(analysis, queryStatement, schemaTree, context);
+ analyzeWhere(analysis, queryStatement, schemaTree, context);
if (analysis.getWhereExpression() != null
&& analysis.getWhereExpression().equals(ConstantOperand.FALSE)) {
return finishQuery(queryStatement, analysis, outputExpressions);
@@ -395,7 +399,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
queryStatement =
(QueryStatement)
concatPathRewriter.rewrite(
- queryStatement, new
PathPatternTree(queryStatement.useWildcard()));
+ queryStatement, new
PathPatternTree(queryStatement.useWildcard()), context);
analysis.setStatement(queryStatement);
// request schema fetch API
@@ -495,7 +499,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
for (ResultColumn resultColumn :
queryStatement.getSelectComponent().getResultColumns()) {
selectExpressions.add(resultColumn.getExpression());
}
- analyzeLastSource(analysis, selectExpressions, schemaTree);
+ analyzeLastSource(analysis, selectExpressions, schemaTree, context);
analysis.setRespDatasetHeader(DatasetHeaderFactory.getLastQueryHeader());
@@ -506,14 +510,17 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
private void analyzeLastSource(
- Analysis analysis, List<Expression> selectExpressions, ISchemaTree
schemaTree) {
+ Analysis analysis,
+ List<Expression> selectExpressions,
+ ISchemaTree schemaTree,
+ MPPQueryContext context) {
Set<Expression> sourceExpressions = new LinkedHashSet<>();
Set<Expression> lastQueryBaseExpressions = new LinkedHashSet<>();
Map<Expression, List<Expression>>
lastQueryNonWritableViewSourceExpressionMap = null;
for (Expression selectExpression : selectExpressions) {
for (Expression lastQuerySourceExpression :
- bindSchemaForExpression(selectExpression, schemaTree)) {
+ bindSchemaForExpression(selectExpression, schemaTree, context)) {
if (lastQuerySourceExpression instanceof TimeSeriesOperand) {
lastQueryBaseExpressions.add(lastQuerySourceExpression);
sourceExpressions.add(lastQuerySourceExpression);
@@ -584,7 +591,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- GroupByLevelHelper groupByLevelHelper) {
+ GroupByLevelHelper groupByLevelHelper,
+ MPPQueryContext queryContext) {
Map<Integer, Set<Pair<Expression, String>>> outputExpressionMap = new
HashMap<>();
int columnIndex = 0;
@@ -592,7 +600,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Set<Pair<Expression, String>> outputExpressionSet = new
LinkedHashSet<>();
List<Expression> resultExpressions =
- bindSchemaForExpression(resultColumn.getExpression(), schemaTree);
+ bindSchemaForExpression(resultColumn.getExpression(), schemaTree,
queryContext);
boolean isCountStar =
resultColumn.getExpression().getExpressionType().equals(ExpressionType.FUNCTION)
&& ((FunctionExpression)
resultColumn.getExpression()).isCountStar();
@@ -640,7 +648,10 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
/** process select component for align by time. */
private List<Pair<Expression, String>> analyzeSelect(
- Analysis analysis, QueryStatement queryStatement, ISchemaTree
schemaTree) {
+ Analysis analysis,
+ QueryStatement queryStatement,
+ ISchemaTree schemaTree,
+ MPPQueryContext queryContext) {
Map<Integer, List<Pair<Expression, String>>> outputExpressionMap = new
HashMap<>();
ColumnPaginationController paginationController =
@@ -654,7 +665,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
for (ResultColumn resultColumn :
queryStatement.getSelectComponent().getResultColumns()) {
List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
List<Expression> resultExpressions =
- bindSchemaForExpression(resultColumn.getExpression(), schemaTree);
+ bindSchemaForExpression(resultColumn.getExpression(), schemaTree,
queryContext);
for (Expression resultExpression : resultExpressions) {
if (paginationController.hasCurOffset()) {
@@ -710,7 +721,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- List<PartialPath> deviceList) {
+ List<PartialPath> deviceList,
+ MPPQueryContext queryContext) {
List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
Map<String, Set<Expression>> deviceToSelectExpressions = new HashMap<>();
ColumnPaginationController paginationController =
@@ -726,7 +738,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
new LinkedHashMap<>();
for (PartialPath device : deviceList) {
List<Expression> selectExpressionsOfOneDevice =
- concatDeviceAndBindSchemaForExpression(selectExpression, device,
schemaTree);
+ concatDeviceAndBindSchemaForExpression(
+ selectExpression, device, schemaTree, queryContext);
if (selectExpressionsOfOneDevice.isEmpty()) {
continue;
}
@@ -863,14 +876,16 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- UnaryOperator<Expression> havingExpressionAnalyzer) {
+ UnaryOperator<Expression> havingExpressionAnalyzer,
+ MPPQueryContext queryContext) {
// get removeWildcard Expressions in Having
List<Expression> conJunctions =
ExpressionAnalyzer.bindSchemaForPredicate(
queryStatement.getHavingCondition().getPredicate(),
queryStatement.getFromComponent().getPrefixPaths(),
schemaTree,
- true);
+ true,
+ queryContext);
Expression havingExpression =
PredicateUtils.combineConjuncts(
conJunctions.stream().distinct().collect(Collectors.toList()));
@@ -888,20 +903,28 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
private void analyzeHaving(
- Analysis analysis, QueryStatement queryStatement, ISchemaTree
schemaTree) {
+ Analysis analysis,
+ QueryStatement queryStatement,
+ ISchemaTree schemaTree,
+ MPPQueryContext queryContext) {
if (!queryStatement.hasHaving()) {
return;
}
analyzeHavingBase(
- analysis, queryStatement, schemaTree,
ExpressionAnalyzer::normalizeExpression);
+ analysis,
+ queryStatement,
+ schemaTree,
+ ExpressionAnalyzer::normalizeExpression,
+ queryContext);
}
private void analyzeGroupByLevelHaving(
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- GroupByLevelHelper groupByLevelHelper) {
+ GroupByLevelHelper groupByLevelHelper,
+ MPPQueryContext queryContext) {
if (!queryStatement.hasHaving()) {
return;
}
@@ -912,7 +935,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
schemaTree,
havingExpression ->
PredicateUtils.removeDuplicateConjunct(
- groupByLevelHelper.applyLevels(havingExpression, analysis)));
+ groupByLevelHelper.applyLevels(havingExpression, analysis)),
+ queryContext);
// update groupByLevelExpressions
groupByLevelHelper.updateGroupByLevelExpressions(analysis.getHavingExpression());
}
@@ -921,7 +945,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- List<PartialPath> deviceSet) {
+ List<PartialPath> deviceSet,
+ MPPQueryContext queryContext) {
if (!queryStatement.hasHaving()) {
return;
}
@@ -937,7 +962,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
for (PartialPath device : deviceSet) {
List<Expression> expressionsInHaving =
- concatDeviceAndBindSchemaForExpression(havingExpression, device,
schemaTree);
+ concatDeviceAndBindSchemaForExpression(
+ havingExpression, device, schemaTree, queryContext);
conJunctions.addAll(
expressionsInHaving.stream()
@@ -1341,7 +1367,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- List<PartialPath> deviceSet) {
+ List<PartialPath> deviceSet,
+ MPPQueryContext queryContext) {
if (!queryStatement.hasWhere()) {
return;
}
@@ -1352,7 +1379,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
while (deviceIterator.hasNext()) {
PartialPath devicePath = deviceIterator.next();
Expression whereExpression =
- analyzeWhereSplitByDevice(queryStatement, devicePath, schemaTree);
+ analyzeWhereSplitByDevice(queryStatement, devicePath, schemaTree,
queryContext);
if (whereExpression.equals(ConstantOperand.FALSE)) {
deviceIterator.remove();
} else if (whereExpression.equals(ConstantOperand.TRUE)) {
@@ -1371,7 +1398,10 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
private void analyzeWhere(
- Analysis analysis, QueryStatement queryStatement, ISchemaTree
schemaTree) {
+ Analysis analysis,
+ QueryStatement queryStatement,
+ ISchemaTree schemaTree,
+ MPPQueryContext queryContext) {
if (!queryStatement.hasWhere()) {
return;
}
@@ -1380,7 +1410,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
queryStatement.getWhereCondition().getPredicate(),
queryStatement.getFromComponent().getPrefixPaths(),
schemaTree,
- true);
+ true,
+ queryContext);
Expression whereExpression =
convertConJunctionsToWhereExpression(conJunctions);
if (whereExpression.equals(ConstantOperand.TRUE)) {
analysis.setWhereExpression(null);
@@ -1396,10 +1427,17 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
private Expression analyzeWhereSplitByDevice(
- QueryStatement queryStatement, PartialPath devicePath, ISchemaTree
schemaTree) {
+ final QueryStatement queryStatement,
+ final PartialPath devicePath,
+ final ISchemaTree schemaTree,
+ final MPPQueryContext queryContext) {
List<Expression> conJunctions =
ExpressionAnalyzer.concatDeviceAndBindSchemaForPredicate(
- queryStatement.getWhereCondition().getPredicate(), devicePath,
schemaTree, true);
+ queryStatement.getWhereCondition().getPredicate(),
+ devicePath,
+ schemaTree,
+ true,
+ queryContext);
return convertConJunctionsToWhereExpression(conJunctions);
}
@@ -1568,11 +1606,13 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- UnaryOperator<List<Expression>> orderByExpressionAnalyzer) {
+ UnaryOperator<List<Expression>> orderByExpressionAnalyzer,
+ MPPQueryContext queryContext) {
Set<Expression> orderByExpressions = new LinkedHashSet<>();
for (Expression expressionForItem :
queryStatement.getExpressionSortItemList()) {
// Expression in a sortItem only indicates one column
- List<Expression> expressions =
bindSchemaForExpression(expressionForItem, schemaTree);
+ List<Expression> expressions =
+ bindSchemaForExpression(expressionForItem, schemaTree, queryContext);
if (expressions.isEmpty()) {
throw new SemanticException(
String.format(
@@ -1600,19 +1640,24 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
private void analyzeOrderBy(
- Analysis analysis, QueryStatement queryStatement, ISchemaTree
schemaTree) {
+ Analysis analysis,
+ QueryStatement queryStatement,
+ ISchemaTree schemaTree,
+ MPPQueryContext queryContext) {
if (!queryStatement.hasOrderByExpression()) {
return;
}
- analyzeOrderByBase(analysis, queryStatement, schemaTree, expressions ->
expressions);
+ analyzeOrderByBase(
+ analysis, queryStatement, schemaTree, expressions -> expressions,
queryContext);
}
private void analyzeGroupByLevelOrderBy(
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- GroupByLevelHelper groupByLevelHelper) {
+ GroupByLevelHelper groupByLevelHelper,
+ MPPQueryContext queryContext) {
if (!queryStatement.hasOrderByExpression()) {
return;
}
@@ -1627,7 +1672,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
groupedExpressions.add(groupByLevelHelper.applyLevels(expression,
analysis));
}
return new ArrayList<>(groupedExpressions);
- });
+ },
+ queryContext);
// update groupByLevelExpressions
for (Expression orderByExpression : analysis.getOrderByExpressions()) {
groupByLevelHelper.updateGroupByLevelExpressions(orderByExpression);
@@ -1642,7 +1688,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- List<PartialPath> deviceSet) {
+ List<PartialPath> deviceSet,
+ MPPQueryContext queryContext) {
if (queryStatement.getGroupByComponent() == null) {
return;
}
@@ -1654,7 +1701,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Expression expression = groupByComponent.getControlColumnExpression();
for (PartialPath device : deviceSet) {
List<Expression> groupByExpressionsOfOneDevice =
- concatDeviceAndBindSchemaForExpression(expression, device,
schemaTree);
+ concatDeviceAndBindSchemaForExpression(expression, device,
schemaTree, queryContext);
if (groupByExpressionsOfOneDevice.isEmpty()) {
throw new SemanticException(
@@ -1713,7 +1760,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- List<PartialPath> deviceSet) {
+ List<PartialPath> deviceSet,
+ MPPQueryContext queryContext) {
if (!queryStatement.hasOrderByExpression()) {
return;
}
@@ -1726,7 +1774,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Set<Expression> orderByExpressionsForOneDevice = new LinkedHashSet<>();
for (Expression expressionForItem :
queryStatement.getExpressionSortItemList()) {
List<Expression> expressions =
- concatDeviceAndBindSchemaForExpression(expressionForItem, device,
schemaTree);
+ concatDeviceAndBindSchemaForExpression(
+ expressionForItem, device, schemaTree, queryContext);
if (expressions.isEmpty()) {
throw new SemanticException(
String.format(
@@ -1763,7 +1812,10 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
private void analyzeGroupBy(
- Analysis analysis, QueryStatement queryStatement, ISchemaTree
schemaTree) {
+ Analysis analysis,
+ QueryStatement queryStatement,
+ ISchemaTree schemaTree,
+ MPPQueryContext queryContext) {
if (queryStatement.getGroupByComponent() == null) {
return;
@@ -1775,7 +1827,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
if (queryStatement.hasGroupByExpression()) {
groupByExpression = groupByComponent.getControlColumnExpression();
// Expression in group by variation clause only indicates one column
- List<Expression> expressions =
bindSchemaForExpression(groupByExpression, schemaTree);
+ List<Expression> expressions =
+ bindSchemaForExpression(groupByExpression, schemaTree, queryContext);
if (expressions.isEmpty()) {
throw new SemanticException(
String.format(
@@ -2891,7 +2944,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
analysis,
Collections.singletonList(
new TimeSeriesOperand(showTimeSeriesStatement.getPathPattern())),
- schemaTree);
+ schemaTree,
+ context);
analyzeDataPartition(analysis, new QueryStatement(), schemaTree,
context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ConcatPathRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ConcatPathRewriter.java
index 7ac927ac643..041f08e1b5c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ConcatPathRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ConcatPathRewriter.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.path.PathPatternTreeUtils;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.component.ResultColumn;
@@ -48,7 +49,8 @@ public class ConcatPathRewriter {
return patternTree;
}
- public Statement rewrite(Statement statement, PathPatternTree patternTree)
+ public Statement rewrite(
+ Statement statement, PathPatternTree patternTree, MPPQueryContext
queryContext)
throws StatementAnalyzeException {
QueryStatement queryStatement = (QueryStatement) statement;
this.patternTree = patternTree;
@@ -75,7 +77,7 @@ public class ConcatPathRewriter {
} else {
// concat SELECT with FROM
List<ResultColumn> resultColumns =
- concatSelectWithFrom(queryStatement.getSelectComponent(),
prefixPaths);
+ concatSelectWithFrom(queryStatement.getSelectComponent(),
prefixPaths, queryContext);
queryStatement.getSelectComponent().setResultColumns(resultColumns);
// concat GROUP BY with FROM
@@ -85,12 +87,13 @@ public class ConcatPathRewriter {
.setControlColumnExpression(
contactGroupByWithFrom(
queryStatement.getGroupByComponent().getControlColumnExpression(),
- prefixPaths));
+ prefixPaths,
+ queryContext));
}
if (queryStatement.hasOrderByExpression()) {
List<Expression> sortItemExpressions =
queryStatement.getExpressionSortItemList();
sortItemExpressions.replaceAll(
- expression -> contactOrderByWithFrom(expression, prefixPaths));
+ expression -> contactOrderByWithFrom(expression, prefixPaths,
queryContext));
}
}
@@ -119,14 +122,16 @@ public class ConcatPathRewriter {
* path pattern. And construct pattern tree.
*/
private List<ResultColumn> concatSelectWithFrom(
- SelectComponent selectComponent, List<PartialPath> prefixPaths)
+ final SelectComponent selectComponent,
+ final List<PartialPath> prefixPaths,
+ final MPPQueryContext queryContext)
throws StatementAnalyzeException {
// resultColumns after concat
List<ResultColumn> resultColumns = new ArrayList<>();
for (ResultColumn resultColumn : selectComponent.getResultColumns()) {
List<Expression> resultExpressions =
ExpressionAnalyzer.concatExpressionWithSuffixPaths(
- resultColumn.getExpression(), prefixPaths, patternTree);
+ resultColumn.getExpression(), prefixPaths, patternTree,
queryContext);
for (Expression resultExpression : resultExpressions) {
resultColumns.add(
new ResultColumn(
@@ -136,18 +141,26 @@ public class ConcatPathRewriter {
return resultColumns;
}
- private Expression contactGroupByWithFrom(Expression expression,
List<PartialPath> prefixPaths) {
+ private Expression contactGroupByWithFrom(
+ final Expression expression,
+ final List<PartialPath> prefixPaths,
+ final MPPQueryContext queryContext) {
List<Expression> resultExpressions =
- ExpressionAnalyzer.concatExpressionWithSuffixPaths(expression,
prefixPaths, patternTree);
+ ExpressionAnalyzer.concatExpressionWithSuffixPaths(
+ expression, prefixPaths, patternTree, queryContext);
if (resultExpressions.size() != 1) {
throw new IllegalStateException("Expression in group by should indicate
one value");
}
return resultExpressions.get(0);
}
- private Expression contactOrderByWithFrom(Expression expression,
List<PartialPath> prefixPaths) {
+ private Expression contactOrderByWithFrom(
+ final Expression expression,
+ final List<PartialPath> prefixPaths,
+ final MPPQueryContext queryContext) {
List<Expression> resultExpressions =
- ExpressionAnalyzer.concatExpressionWithSuffixPaths(expression,
prefixPaths, patternTree);
+ ExpressionAnalyzer.concatExpressionWithSuffixPaths(
+ expression, prefixPaths, patternTree, queryContext);
if (resultExpressions.size() != 1) {
throw new IllegalStateException("Expression in order by should indicate
one value");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzer.java
index 010cf8fcd01..ebc52a86a62 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzer.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction;
import
org.apache.iotdb.commons.udf.builtin.BuiltinTimeSeriesGeneratingFunction;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
@@ -264,11 +265,15 @@ public class ExpressionAnalyzer {
* @return the concatenated expression list
*/
public static List<Expression> concatExpressionWithSuffixPaths(
- Expression expression, List<PartialPath> prefixPaths, PathPatternTree
patternTree) {
+ final Expression expression,
+ final List<PartialPath> prefixPaths,
+ final PathPatternTree patternTree,
+ final MPPQueryContext queryContext) {
return new ConcatExpressionWithSuffixPathsVisitor()
.process(
expression,
- new ConcatExpressionWithSuffixPathsVisitor.Context(prefixPaths,
patternTree));
+ new ConcatExpressionWithSuffixPathsVisitor.Context(
+ prefixPaths, patternTree, queryContext));
}
/**
@@ -405,8 +410,11 @@ public class ExpressionAnalyzer {
* expressions
*/
public static List<Expression> bindSchemaForExpression(
- Expression expression, ISchemaTree schemaTree) {
- return new BindSchemaForExpressionVisitor().process(expression,
schemaTree);
+ final Expression expression,
+ final ISchemaTree schemaTree,
+ final MPPQueryContext queryContext) {
+ return new BindSchemaForExpressionVisitor()
+ .process(expression, new
BindSchemaForExpressionVisitor.Context(schemaTree, queryContext));
}
/**
@@ -419,10 +427,16 @@ public class ExpressionAnalyzer {
* @return the expression list with full path and after binding schema
*/
public static List<Expression> bindSchemaForPredicate(
- Expression predicate, List<PartialPath> prefixPaths, ISchemaTree
schemaTree, boolean isRoot) {
+ final Expression predicate,
+ final List<PartialPath> prefixPaths,
+ final ISchemaTree schemaTree,
+ final boolean isRoot,
+ final MPPQueryContext queryContext) {
return new BindSchemaForPredicateVisitor()
.process(
- predicate, new BindSchemaForPredicateVisitor.Context(prefixPaths,
schemaTree, isRoot));
+ predicate,
+ new BindSchemaForPredicateVisitor.Context(
+ prefixPaths, schemaTree, isRoot, queryContext));
}
public static Expression replaceRawPathWithGroupedPath(
@@ -445,11 +459,15 @@ public class ExpressionAnalyzer {
* @return expression list with full path and after binding schema
*/
public static List<Expression> concatDeviceAndBindSchemaForExpression(
- Expression expression, PartialPath devicePath, ISchemaTree schemaTree) {
+ final Expression expression,
+ final PartialPath devicePath,
+ final ISchemaTree schemaTree,
+ final MPPQueryContext queryContext) {
return new ConcatDeviceAndBindSchemaForExpressionVisitor()
.process(
expression,
- new
ConcatDeviceAndBindSchemaForExpressionVisitor.Context(devicePath, schemaTree));
+ new ConcatDeviceAndBindSchemaForExpressionVisitor.Context(
+ devicePath, schemaTree, queryContext));
}
/**
@@ -459,12 +477,16 @@ public class ExpressionAnalyzer {
* @return the expression list with full path and after binding schema
*/
public static List<Expression> concatDeviceAndBindSchemaForPredicate(
- Expression predicate, PartialPath devicePath, ISchemaTree schemaTree,
boolean isWhere) {
+ final Expression predicate,
+ final PartialPath devicePath,
+ final ISchemaTree schemaTree,
+ final boolean isWhere,
+ final MPPQueryContext queryContext) {
return new ConcatDeviceAndBindSchemaForPredicateVisitor()
.process(
predicate,
new ConcatDeviceAndBindSchemaForPredicateVisitor.Context(
- devicePath, schemaTree, isWhere));
+ devicePath, schemaTree, isWhere, queryContext));
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java
index 04b348a2462..fc9d3a0aa51 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.analyze;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType;
import
org.apache.iotdb.db.queryengine.plan.expression.UnknownExpressionTypeException;
@@ -61,11 +62,16 @@ public class ExpressionUtils {
// util class
}
- public static List<Expression> reconstructTimeSeriesOperands(
- TimeSeriesOperand rawExpression, List<? extends PartialPath>
actualPaths) {
+ /* Use queryContext to record the memory usage of the constructed
Expression. */
+ public static List<Expression> reconstructTimeSeriesOperandsWithMemoryCheck(
+ final TimeSeriesOperand rawExpression,
+ final List<? extends PartialPath> actualPaths,
+ final MPPQueryContext queryContext) {
List<Expression> resultExpressions = new ArrayList<>();
for (PartialPath actualPath : actualPaths) {
- resultExpressions.add(reconstructTimeSeriesOperand(rawExpression,
actualPath));
+ resultExpressions.add(
+ reserveMemoryForExpression(
+ queryContext, reconstructTimeSeriesOperand(rawExpression,
actualPath)));
}
return resultExpressions;
}
@@ -76,11 +82,15 @@ public class ExpressionUtils {
return cloneCommonFields(rawExpression, resultExpression);
}
- public static List<Expression> reconstructFunctionExpressions(
- FunctionExpression expression, List<List<Expression>>
childExpressionsList) {
+ public static List<Expression> reconstructFunctionExpressionsWithMemoryCheck(
+ final FunctionExpression expression,
+ final List<List<Expression>> childExpressionsList,
+ final MPPQueryContext queryContext) {
List<Expression> resultExpressions = new ArrayList<>();
for (List<Expression> functionExpressions : childExpressionsList) {
- resultExpressions.add(reconstructFunctionExpression(expression,
functionExpressions));
+ resultExpressions.add(
+ reserveMemoryForExpression(
+ queryContext, reconstructFunctionExpression(expression,
functionExpressions)));
}
return resultExpressions;
}
@@ -107,11 +117,15 @@ public class ExpressionUtils {
return cloneCommonFields(rawExpression, resultExpression);
}
- public static List<Expression> reconstructUnaryExpressions(
- UnaryExpression expression, List<Expression> childExpressions) {
+ public static List<Expression> reconstructUnaryExpressionsWithMemoryCheck(
+ final UnaryExpression expression,
+ final List<Expression> childExpressions,
+ final MPPQueryContext queryContext) {
List<Expression> resultExpressions = new ArrayList<>();
for (Expression childExpression : childExpressions) {
- resultExpressions.add(reconstructUnaryExpression(expression,
childExpression));
+ resultExpressions.add(
+ reserveMemoryForExpression(
+ queryContext, reconstructUnaryExpression(expression,
childExpression)));
}
return resultExpressions;
}
@@ -172,14 +186,17 @@ public class ExpressionUtils {
return cloneCommonFields(rawExpression, resultExpression);
}
- public static List<Expression> reconstructBinaryExpressions(
- BinaryExpression expression,
- List<Expression> leftExpressions,
- List<Expression> rightExpressions) {
+ public static List<Expression> reconstructBinaryExpressionsWithMemoryCheck(
+ final BinaryExpression expression,
+ final List<Expression> leftExpressions,
+ final List<Expression> rightExpressions,
+ final MPPQueryContext queryContext) {
List<Expression> resultExpressions = new ArrayList<>();
for (Expression le : leftExpressions) {
for (Expression re : rightExpressions) {
- resultExpressions.add(reconstructBinaryExpression(expression, le, re));
+ resultExpressions.add(
+ reserveMemoryForExpression(
+ queryContext, reconstructBinaryExpression(expression, le,
re)));
}
}
return resultExpressions;
@@ -238,16 +255,19 @@ public class ExpressionUtils {
return cloneCommonFields(rawExpression, resultExpression);
}
- public static List<Expression> reconstructTernaryExpressions(
- TernaryExpression expression,
- List<Expression> firstExpressions,
- List<Expression> secondExpressions,
- List<Expression> thirdExpressions) {
+ public static List<Expression> reconstructTernaryExpressionsWithMemoryCheck(
+ final TernaryExpression expression,
+ final List<Expression> firstExpressions,
+ final List<Expression> secondExpressions,
+ final List<Expression> thirdExpressions,
+ final MPPQueryContext queryContext) {
List<Expression> resultExpressions = new ArrayList<>();
for (Expression fe : firstExpressions) {
for (Expression se : secondExpressions)
for (Expression te : thirdExpressions) {
- resultExpressions.add(reconstructTernaryExpression(expression, fe,
se, te));
+ resultExpressions.add(
+ reserveMemoryForExpression(
+ queryContext, reconstructTernaryExpression(expression, fe,
se, te)));
}
}
return resultExpressions;
@@ -278,6 +298,12 @@ public class ExpressionUtils {
return resultExpression;
}
+ private static Expression reserveMemoryForExpression(
+ MPPQueryContext queryContext, Expression expression) {
+ queryContext.reserveMemoryForFrontEnd(expression == null ? 0 :
expression.ramBytesUsed());
+ return expression;
+ }
+
/**
* Make cartesian product. Attention, in this implementation, the way to
handle the empty set is
* to ignore it instead of making the result an empty set.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
index 856476eae24..87bea8c61a9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
@@ -129,6 +129,8 @@ public class TemplatedAnalyze {
TimeSeriesOperand measurementPath =
new TimeSeriesOperand(
new MeasurementPath(new String[] {measurementName},
measurementSchema));
+ // reserve memory for this expression
+ context.reserveMemoryForFrontEnd(measurementPath.ramBytesUsed());
outputExpressions.add(new Pair<>(measurementPath, null));
paginationController.consumeLimit();
} else {
@@ -150,6 +152,8 @@ public class TemplatedAnalyze {
TimeSeriesOperand measurementPath =
new TimeSeriesOperand(
new MeasurementPath(new String[] {measurementName},
measurementSchema));
+ // reserve memory for this expression
+ context.reserveMemoryForFrontEnd(measurementPath.ramBytesUsed());
outputExpressions.add(new Pair<>(measurementPath,
resultColumn.getAlias()));
} else {
break;
@@ -183,7 +187,7 @@ public class TemplatedAnalyze {
}
analysis.setDeviceList(deviceList);
- analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree, deviceList);
+ analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree, deviceList,
context);
analyzeDeviceToSourceTransform(analysis);
analyzeDeviceToSource(analysis);
@@ -272,7 +276,8 @@ public class TemplatedAnalyze {
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- List<PartialPath> deviceSet) {
+ List<PartialPath> deviceSet,
+ MPPQueryContext queryContext) {
if (!queryStatement.hasOrderByExpression()) {
return;
}
@@ -285,7 +290,8 @@ public class TemplatedAnalyze {
Set<Expression> orderByExpressionsForOneDevice = new LinkedHashSet<>();
for (Expression expressionForItem :
queryStatement.getExpressionSortItemList()) {
List<Expression> expressions =
- concatDeviceAndBindSchemaForExpression(expressionForItem, device,
schemaTree);
+ concatDeviceAndBindSchemaForExpression(
+ expressionForItem, device, schemaTree, queryContext);
if (expressions.isEmpty()) {
throw new SemanticException(
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 9724a957b52..40a188e3d6a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -179,6 +179,9 @@ public class QueryExecution implements IQueryExecution {
PERFORMANCE_OVERVIEW_METRICS.recordPlanCost(System.nanoTime() - startTime);
schedule();
+ // The last batch of memory reserved by the front end
+ context.reserveMemoryForFrontEndImmediately();
+
// friendly for gc
logicalPlan.clearUselessMemory();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/Expression.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/Expression.java
index 48b1b5806dc..9b89a122b06 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/Expression.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/Expression.java
@@ -56,6 +56,7 @@ import
org.apache.iotdb.db.queryengine.transformation.dag.memory.LayerMemoryAssi
import org.apache.iotdb.db.queryengine.transformation.dag.udf.UDTFExecutor;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Accountable;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
@@ -71,7 +72,7 @@ import java.util.NoSuchElementException;
import java.util.Objects;
/** A skeleton class for expression */
-public abstract class Expression extends StatementNode {
+public abstract class Expression extends StatementNode implements Accountable {
/////////////////////////////////////////////////////////////////////////////////////////////////
// Operations that Class Expression is not responsible for should be done
through a visitor
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/binary/BinaryExpression.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/binary/BinaryExpression.java
index 793e09d98f8..39189759a60 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/binary/BinaryExpression.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/binary/BinaryExpression.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.expression.binary;
import org.apache.iotdb.db.queryengine.common.NodeRef;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import
org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
@@ -27,6 +28,7 @@ import
org.apache.iotdb.db.queryengine.transformation.dag.memory.LayerMemoryAssi
import org.apache.iotdb.db.queryengine.transformation.dag.udf.UDTFExecutor;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.RamUsageEstimator;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -38,6 +40,9 @@ import java.util.Map;
public abstract class BinaryExpression extends Expression {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(BinaryExpression.class);
+
protected Expression leftExpression;
protected Expression rightExpression;
@@ -156,6 +161,13 @@ public abstract class BinaryExpression extends Expression {
return buildExpression(left, right);
}
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(leftExpression)
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(rightExpression);
+ }
+
private String buildExpression(String left, String right) {
StringBuilder builder = new StringBuilder();
if (leftExpression.getExpressionType().getPriority() <
this.getExpressionType().getPriority()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/ConstantOperand.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/ConstantOperand.java
index 92a7a7b328c..f4527cdc755 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/ConstantOperand.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/ConstantOperand.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation
import
org.apache.iotdb.db.queryengine.transformation.dag.memory.LayerMemoryAssigner;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
@@ -40,6 +41,9 @@ public class ConstantOperand extends LeafOperand {
public static final ConstantOperand FALSE = new
ConstantOperand(TSDataType.BOOLEAN, "false");
public static final ConstantOperand TRUE = new
ConstantOperand(TSDataType.BOOLEAN, "true");
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(ConstantOperand.class);
+
private final String valueString;
private final TSDataType dataType;
@@ -116,4 +120,9 @@ public class ConstantOperand extends LeafOperand {
dataType.serializeTo(stream);
ReadWriteIOUtils.write(valueString, stream);
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE + RamUsageEstimator.sizeOf(valueString);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/NullOperand.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/NullOperand.java
index 4c7181b9f81..4e2781a133c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/NullOperand.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/NullOperand.java
@@ -24,6 +24,8 @@ import
org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import
org.apache.iotdb.db.queryengine.transformation.dag.memory.LayerMemoryAssigner;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -31,6 +33,9 @@ import java.util.List;
import java.util.Map;
public class NullOperand extends LeafOperand {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(NullOperand.class);
+
@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitNullOperand(this, context);
@@ -71,4 +76,9 @@ public class NullOperand extends LeafOperand {
protected void serialize(DataOutputStream stream) throws IOException {
// do nothing
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimeSeriesOperand.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimeSeriesOperand.java
index a05b2577d93..22ded4d95d7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimeSeriesOperand.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimeSeriesOperand.java
@@ -22,12 +22,14 @@ package
org.apache.iotdb.db.queryengine.plan.expression.leaf;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType;
import
org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import
org.apache.iotdb.db.queryengine.transformation.dag.memory.LayerMemoryAssigner;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.RamUsageEstimator;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -37,6 +39,9 @@ import java.util.Map;
public class TimeSeriesOperand extends LeafOperand {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(TimeSeriesOperand.class);
+
private PartialPath path;
public TimeSeriesOperand(PartialPath path) {
@@ -106,4 +111,9 @@ public class TimeSeriesOperand extends LeafOperand {
protected void serialize(DataOutputStream stream) throws IOException {
path.serialize(stream);
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE +
MemoryEstimationHelper.getEstimatedSizeOfPartialPath(path);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimestampOperand.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimestampOperand.java
index ece2f5ad9ef..6c817a20336 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimestampOperand.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimestampOperand.java
@@ -24,6 +24,8 @@ import
org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import
org.apache.iotdb.db.queryengine.transformation.dag.memory.LayerMemoryAssigner;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -32,6 +34,9 @@ import java.util.Map;
public class TimestampOperand extends LeafOperand {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(TimeSeriesOperand.class);
+
public static final String TIMESTAMP_EXPRESSION_STRING = "Time";
public TimestampOperand() {
@@ -82,4 +87,9 @@ public class TimestampOperand extends LeafOperand {
protected void serialize(DataOutputStream stream) throws IOException {
// do nothing
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java
index 4a0ff5458cf..a17864ff3be 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction;
import org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction;
import org.apache.iotdb.commons.udf.service.UDFManagementService;
import org.apache.iotdb.db.queryengine.common.NodeRef;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
@@ -37,6 +38,7 @@ import
org.apache.iotdb.db.queryengine.transformation.dag.udf.UDTFInformationInf
import org.apache.iotdb.udf.api.customizer.strategy.AccessStrategy;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
@@ -53,6 +55,9 @@ import java.util.stream.Collectors;
public class FunctionExpression extends Expression {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(FunctionExpression.class);
+
private FunctionType functionType;
private final String functionName;
@@ -392,4 +397,25 @@ public class FunctionExpression extends Expression {
Expression.serialize(expression, stream);
}
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ + RamUsageEstimator.sizeOf(functionName)
+ + RamUsageEstimator.sizeOf(parametersString)
+ + RamUsageEstimator.sizeOfMap(functionAttributes)
+ + (expressions == null
+ ? 0
+ : expressions.stream()
+
.mapToLong(MemoryEstimationHelper::getEstimatedSizeOfAccountableObject)
+ .sum())
+ + (paths == null
+ ? 0
+ :
paths.stream().mapToLong(MemoryEstimationHelper::getEstimatedSizeOfPartialPath).sum())
+ + (countTimeExpressions == null
+ ? 0
+ : countTimeExpressions.stream()
+
.mapToLong(MemoryEstimationHelper::getEstimatedSizeOfAccountableObject)
+ .sum());
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/CaseWhenThenExpression.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/CaseWhenThenExpression.java
index cd28e9f110e..142473af853 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/CaseWhenThenExpression.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/CaseWhenThenExpression.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.expression.other;
import org.apache.iotdb.db.queryengine.common.NodeRef;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType;
import
org.apache.iotdb.db.queryengine.plan.expression.binary.WhenThenExpression;
@@ -31,6 +32,7 @@ import
org.apache.iotdb.db.queryengine.transformation.dag.udf.UDTFExecutor;
import org.apache.commons.lang3.Validate;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
@@ -42,6 +44,9 @@ import java.util.List;
import java.util.Map;
public class CaseWhenThenExpression extends Expression {
+
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(CaseWhenThenExpression.class);
protected List<WhenThenExpression> whenThenExpressions = new ArrayList<>();
protected Expression elseExpression;
@@ -183,4 +188,15 @@ public class CaseWhenThenExpression extends Expression {
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitCaseWhenThenExpression(this, context);
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(elseExpression)
+ + (whenThenExpressions == null
+ ? 0
+ : whenThenExpressions.stream()
+
.mapToLong(MemoryEstimationHelper::getEstimatedSizeOfAccountableObject)
+ .sum());
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/GroupByTimeExpression.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/GroupByTimeExpression.java
index 21ed6187449..4ee3c174bc4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/GroupByTimeExpression.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/GroupByTimeExpression.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.queryengine.transformation.dag.memory.LayerMemoryAssi
import org.apache.iotdb.db.queryengine.transformation.dag.udf.UDTFExecutor;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.utils.TimeDuration;
@@ -41,6 +42,9 @@ import java.util.Map;
/** Only used for representing GROUP BY TIME filter. */
public class GroupByTimeExpression extends Expression {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(GroupByTimeExpression.class);
+
// [startTime, endTime]
private final long startTime;
private final long endTime;
@@ -157,4 +161,9 @@ public class GroupByTimeExpression extends Expression {
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitGroupByTimeExpression(this, context);
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/ternary/BetweenExpression.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/ternary/BetweenExpression.java
index 04025bcab62..00b307df340 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/ternary/BetweenExpression.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/ternary/BetweenExpression.java
@@ -21,10 +21,12 @@
package org.apache.iotdb.db.queryengine.plan.expression.ternary;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType;
import
org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
@@ -32,6 +34,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
public class BetweenExpression extends TernaryExpression {
+
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(TernaryExpression.class);
+
private final boolean isNotBetween;
public boolean isNotBetween() {
@@ -102,4 +108,12 @@ public class BetweenExpression extends TernaryExpression {
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitBetweenExpression(this, context);
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(firstExpression)
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(secondExpression)
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(thirdExpression);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/InExpression.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/InExpression.java
index 5943b209ace..0ace411cd94 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/InExpression.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/InExpression.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.expression.unary;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand;
@@ -27,6 +28,7 @@ import
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import
org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import javax.validation.constraints.NotNull;
@@ -38,7 +40,8 @@ import java.util.Iterator;
import java.util.LinkedHashSet;
public class InExpression extends UnaryExpression {
-
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(InExpression.class);
private final boolean isNotIn;
private final LinkedHashSet<String> values;
@@ -138,4 +141,11 @@ public class InExpression extends UnaryExpression {
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitInExpression(this, context);
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(expression)
+ + (values == null ? 0 :
values.stream().mapToLong(RamUsageEstimator::sizeOf).sum());
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/IsNullExpression.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/IsNullExpression.java
index 75d55b242fd..9542b29e2da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/IsNullExpression.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/IsNullExpression.java
@@ -19,10 +19,12 @@
package org.apache.iotdb.db.queryengine.plan.expression.unary;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType;
import
org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
@@ -30,6 +32,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
public class IsNullExpression extends UnaryExpression {
+
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(IsNullExpression.class);
private final boolean isNot;
public IsNullExpression(Expression expression, boolean isNot) {
@@ -77,4 +82,9 @@ public class IsNullExpression extends UnaryExpression {
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitIsNullExpression(this, context);
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(expression);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LikeExpression.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LikeExpression.java
index fdc158a9f76..a83824ce68d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LikeExpression.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LikeExpression.java
@@ -19,10 +19,12 @@
package org.apache.iotdb.db.queryengine.plan.expression.unary;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType;
import
org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
@@ -35,6 +37,9 @@ import static
org.apache.tsfile.utils.RegexUtils.parseLikePatternToRegex;
public class LikeExpression extends UnaryExpression {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(LikeExpression.class);
+
private final String patternString;
private final Pattern pattern;
@@ -107,4 +112,11 @@ public class LikeExpression extends UnaryExpression {
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitLikeExpression(this, context);
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(expression)
+ + RamUsageEstimator.sizeOf(patternString);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LogicNotExpression.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LogicNotExpression.java
index 69719cfd415..9797376e545 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LogicNotExpression.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LogicNotExpression.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.expression.unary;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand;
@@ -27,10 +28,15 @@ import
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import
org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
import java.nio.ByteBuffer;
public class LogicNotExpression extends UnaryExpression {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(LogicNotExpression.class);
+
public LogicNotExpression(Expression expression) {
super(expression);
}
@@ -64,4 +70,9 @@ public class LogicNotExpression extends UnaryExpression {
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitLogicNotExpression(this, context);
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(expression);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/NegationExpression.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/NegationExpression.java
index 52df786b486..d496a05c261 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/NegationExpression.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/NegationExpression.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.expression.unary;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand;
@@ -27,10 +28,15 @@ import
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import
org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
import java.nio.ByteBuffer;
public class NegationExpression extends UnaryExpression {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(NegationExpression.class);
+
public NegationExpression(Expression expression) {
super(expression);
}
@@ -70,4 +76,9 @@ public class NegationExpression extends UnaryExpression {
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitNegationExpression(this, context);
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(expression);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/RegularExpression.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/RegularExpression.java
index 39a1baac949..4d517e219e7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/RegularExpression.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/RegularExpression.java
@@ -19,11 +19,13 @@
package org.apache.iotdb.db.queryengine.plan.expression.unary;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType;
import
org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor;
import org.apache.commons.lang3.Validate;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
@@ -35,6 +37,9 @@ import static org.apache.tsfile.utils.RegexUtils.compileRegex;
public class RegularExpression extends UnaryExpression {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(RegularExpression.class);
+
private final String patternString;
private final Pattern pattern;
@@ -111,4 +116,11 @@ public class RegularExpression extends UnaryExpression {
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitRegularExpression(this, context);
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(expression)
+ + RamUsageEstimator.sizeOf(patternString);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java
index 7383e2784eb..fdbd9c52142 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
@@ -34,6 +35,7 @@ import
org.apache.iotdb.db.queryengine.plan.expression.visitor.CompleteMeasureme
import
org.apache.iotdb.db.schemaengine.schemaregion.view.visitor.TransformToExpressionVisitor;
import org.apache.iotdb.db.utils.constant.SqlConstant;
+import org.apache.commons.lang3.Validate;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import java.util.ArrayList;
@@ -43,20 +45,21 @@ import java.util.List;
import java.util.stream.Collectors;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.cartesianProduct;
-import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressions;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressionsWithMemoryCheck;
import static
org.apache.iotdb.db.utils.TypeInferenceUtils.bindTypeForBuiltinAggregationNonSeriesInputExpressions;
import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
-public class BindSchemaForExpressionVisitor extends
CartesianProductVisitor<ISchemaTree> {
+public class BindSchemaForExpressionVisitor
+ extends CartesianProductVisitor<BindSchemaForExpressionVisitor.Context> {
@Override
public List<Expression> visitFunctionExpression(
- FunctionExpression functionExpression, ISchemaTree schemaTree) {
+ FunctionExpression functionExpression, Context context) {
if (COUNT_TIME.equalsIgnoreCase(functionExpression.getFunctionName())) {
List<Expression> usedExpressions =
functionExpression.getExpressions().stream()
- .flatMap(e -> process(e, schemaTree).stream())
+ .flatMap(e -> process(e, context).stream())
.collect(Collectors.toList());
Expression countTimeExpression =
@@ -73,7 +76,7 @@ public class BindSchemaForExpressionVisitor extends
CartesianProductVisitor<ISch
// to collect the produced expressions.
List<List<Expression>> extendedExpressions = new ArrayList<>();
for (Expression originExpression : functionExpression.getExpressions()) {
- List<Expression> actualExpressions = process(originExpression,
schemaTree);
+ List<Expression> actualExpressions = process(originExpression, context);
if (actualExpressions.isEmpty()) {
// Let's ignore the eval of the function which has at least one
non-existence series as
// input. See IOTDB-1212: https://github.com/apache/iotdb/pull/3101
@@ -96,15 +99,16 @@ public class BindSchemaForExpressionVisitor extends
CartesianProductVisitor<ISch
List<List<Expression>> childExpressionsList = new ArrayList<>();
cartesianProduct(extendedExpressions, childExpressionsList, 0, new
ArrayList<>());
- return reconstructFunctionExpressions(functionExpression,
childExpressionsList);
+ return reconstructFunctionExpressionsWithMemoryCheck(
+ functionExpression, childExpressionsList, context.getQueryContext());
}
@Override
public List<Expression> visitTimeSeriesOperand(
- TimeSeriesOperand timeSeriesOperand, ISchemaTree schemaTree) {
+ TimeSeriesOperand timeSeriesOperand, Context context) {
PartialPath timeSeriesOperandPath = timeSeriesOperand.getPath();
List<MeasurementPath> actualPaths =
- schemaTree.searchMeasurementPaths(timeSeriesOperandPath).left;
+
context.getSchemaTree().searchMeasurementPaths(timeSeriesOperandPath).left;
// process logical view
List<MeasurementPath> nonViewActualPaths = new ArrayList<>();
List<MeasurementPath> viewPaths = new ArrayList<>();
@@ -116,10 +120,11 @@ public class BindSchemaForExpressionVisitor extends
CartesianProductVisitor<ISch
}
}
List<Expression> reconstructTimeSeriesOperands =
- ExpressionUtils.reconstructTimeSeriesOperands(timeSeriesOperand,
nonViewActualPaths);
+ ExpressionUtils.reconstructTimeSeriesOperandsWithMemoryCheck(
+ timeSeriesOperand, nonViewActualPaths, context.getQueryContext());
// handle logical views
for (MeasurementPath measurementPath : viewPaths) {
- Expression replacedExpression = transformViewPath(measurementPath,
schemaTree);
+ Expression replacedExpression = transformViewPath(measurementPath,
context.getSchemaTree());
replacedExpression.setViewPath(measurementPath);
reconstructTimeSeriesOperands.add(replacedExpression);
}
@@ -128,13 +133,12 @@ public class BindSchemaForExpressionVisitor extends
CartesianProductVisitor<ISch
@Override
public List<Expression> visitTimeStampOperand(
- TimestampOperand timestampOperand, ISchemaTree schemaTree) {
+ TimestampOperand timestampOperand, Context context) {
return Collections.singletonList(timestampOperand);
}
@Override
- public List<Expression> visitConstantOperand(
- ConstantOperand constantOperand, ISchemaTree schemaTree) {
+ public List<Expression> visitConstantOperand(ConstantOperand
constantOperand, Context context) {
return Collections.singletonList(constantOperand);
}
@@ -152,4 +156,24 @@ public class BindSchemaForExpressionVisitor extends
CartesianProductVisitor<ISch
expression = new CompleteMeasurementSchemaVisitor().process(expression,
schemaTree);
return expression;
}
+
+ public static class Context implements QueryContextProvider {
+ private final ISchemaTree schemaTree;
+ private final MPPQueryContext queryContext;
+
+ public Context(final ISchemaTree schemaTree, final MPPQueryContext
queryContext) {
+ this.schemaTree = schemaTree;
+ Validate.notNull(queryContext, "QueryContext is null");
+ this.queryContext = queryContext;
+ }
+
+ public ISchemaTree getSchemaTree() {
+ return schemaTree;
+ }
+
+ @Override
+ public MPPQueryContext getQueryContext() {
+ return queryContext;
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java
index f9c9d7e7d0e..485a5ba3d4c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType;
@@ -32,6 +33,8 @@ import
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand;
import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.utils.constant.SqlConstant;
+import org.apache.commons.lang3.Validate;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -39,9 +42,9 @@ import java.util.List;
import java.util.stream.Collectors;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.cartesianProduct;
-import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructBinaryExpressions;
-import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressions;
-import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperands;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructBinaryExpressionsWithMemoryCheck;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressionsWithMemoryCheck;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperandsWithMemoryCheck;
import static
org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian.BindSchemaForExpressionVisitor.transformViewPath;
import static
org.apache.iotdb.db.utils.TypeInferenceUtils.bindTypeForBuiltinAggregationNonSeriesInputExpressions;
import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
@@ -61,7 +64,8 @@ public class BindSchemaForPredicateVisitor
resultExpressions.addAll(rightExpressions);
return resultExpressions;
}
- return reconstructBinaryExpressions(binaryExpression, leftExpressions,
rightExpressions);
+ return reconstructBinaryExpressionsWithMemoryCheck(
+ binaryExpression, leftExpressions, rightExpressions,
context.getQueryContext());
}
@Override
@@ -86,7 +90,11 @@ public class BindSchemaForPredicateVisitor
extendedExpressions.add(
process(
suffixExpression,
- new Context(context.getPrefixPaths(), context.getSchemaTree(),
false)));
+ new Context(
+ context.getPrefixPaths(),
+ context.getSchemaTree(),
+ false,
+ context.getQueryContext())));
// We just process first input Expression of Count_IF,
// keep other input Expressions as origin and bind Type
@@ -99,7 +107,8 @@ public class BindSchemaForPredicateVisitor
}
List<List<Expression>> childExpressionsList = new ArrayList<>();
cartesianProduct(extendedExpressions, childExpressionsList, 0, new
ArrayList<>());
- return reconstructFunctionExpressions(predicate, childExpressionsList);
+ return reconstructFunctionExpressionsWithMemoryCheck(
+ predicate, childExpressionsList, context.getQueryContext());
}
@Override
@@ -130,7 +139,8 @@ public class BindSchemaForPredicateVisitor
}
}
List<Expression> reconstructTimeSeriesOperands =
- reconstructTimeSeriesOperands(predicate, nonViewPathList);
+ reconstructTimeSeriesOperandsWithMemoryCheck(
+ predicate, nonViewPathList, context.getQueryContext());
for (MeasurementPath measurementPath : viewPathList) {
Expression replacedExpression = transformViewPath(measurementPath,
context.getSchemaTree());
replacedExpression.setViewPath(measurementPath);
@@ -150,19 +160,27 @@ public class BindSchemaForPredicateVisitor
return Collections.singletonList(constantOperand);
}
- public static class Context {
+ public static class Context implements QueryContextProvider {
private final List<PartialPath> prefixPaths;
private final ISchemaTree schemaTree;
private final boolean isRoot;
- public Context(List<PartialPath> prefixPaths, ISchemaTree schemaTree,
boolean isRoot) {
+ private final MPPQueryContext queryContext;
+
+ public Context(
+ final List<PartialPath> prefixPaths,
+ final ISchemaTree schemaTree,
+ final boolean isRoot,
+ final MPPQueryContext queryContext) {
this.prefixPaths = prefixPaths;
this.schemaTree = schemaTree;
this.isRoot = isRoot;
+ Validate.notNull(queryContext, "QueryContext is null");
+ this.queryContext = queryContext;
}
public Context notRootClone() {
- return new Context(this.prefixPaths, this.schemaTree, false);
+ return new Context(this.prefixPaths, this.schemaTree, false,
queryContext);
}
public List<PartialPath> getPrefixPaths() {
@@ -176,5 +194,10 @@ public class BindSchemaForPredicateVisitor
public boolean isRoot() {
return isRoot;
}
+
+ @Override
+ public MPPQueryContext getQueryContext() {
+ return queryContext;
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/CartesianProductVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/CartesianProductVisitor.java
index 0f8e7decc62..78a9b2ab974 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/CartesianProductVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/CartesianProductVisitor.java
@@ -32,34 +32,39 @@ import java.util.Collections;
import java.util.List;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.cartesianProduct;
-import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructBinaryExpressions;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructBinaryExpressionsWithMemoryCheck;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructCaseWhenThenExpression;
-import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTernaryExpressions;
-import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructUnaryExpressions;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTernaryExpressionsWithMemoryCheck;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructUnaryExpressionsWithMemoryCheck;
-public abstract class CartesianProductVisitor<C>
+public abstract class CartesianProductVisitor<C extends QueryContextProvider>
extends ExpressionAnalyzeVisitor<List<Expression>, C> {
@Override
public List<Expression> visitTernaryExpression(TernaryExpression
ternaryExpression, C context) {
List<List<Expression>> childResultsList =
getResultsFromChild(ternaryExpression, context);
- return reconstructTernaryExpressions(
+ return reconstructTernaryExpressionsWithMemoryCheck(
ternaryExpression,
childResultsList.get(0),
childResultsList.get(1),
- childResultsList.get(2));
+ childResultsList.get(2),
+ context.getQueryContext());
}
@Override
public List<Expression> visitBinaryExpression(BinaryExpression
binaryExpression, C context) {
List<List<Expression>> childResultsList =
getResultsFromChild(binaryExpression, context);
- return reconstructBinaryExpressions(
- binaryExpression, childResultsList.get(0), childResultsList.get(1));
+ return reconstructBinaryExpressionsWithMemoryCheck(
+ binaryExpression,
+ childResultsList.get(0),
+ childResultsList.get(1),
+ context.getQueryContext());
}
@Override
public List<Expression> visitUnaryExpression(UnaryExpression
unaryExpression, C context) {
List<List<Expression>> childResultsList =
getResultsFromChild(unaryExpression, context);
- return reconstructUnaryExpressions(unaryExpression,
childResultsList.get(0));
+ return reconstructUnaryExpressionsWithMemoryCheck(
+ unaryExpression, childResultsList.get(0), context.getQueryContext());
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForExpressionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForExpressionVisitor.java
index 25963b4b4f7..fc0bdb6f669 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForExpressionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForExpressionVisitor.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
@@ -31,6 +32,8 @@ import
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand;
import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.utils.constant.SqlConstant;
+import org.apache.commons.lang3.Validate;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -39,7 +42,7 @@ import java.util.List;
import java.util.stream.Collectors;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.cartesianProduct;
-import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressions;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressionsWithMemoryCheck;
import static
org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian.BindSchemaForExpressionVisitor.transformViewPath;
import static
org.apache.iotdb.db.utils.TypeInferenceUtils.bindTypeForBuiltinAggregationNonSeriesInputExpressions;
import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
@@ -82,7 +85,8 @@ public class ConcatDeviceAndBindSchemaForExpressionVisitor
List<List<Expression>> childExpressionsList = new ArrayList<>();
cartesianProduct(extendedExpressions, childExpressionsList, 0, new
ArrayList<>());
- return reconstructFunctionExpressions(functionExpression,
childExpressionsList);
+ return reconstructFunctionExpressionsWithMemoryCheck(
+ functionExpression, childExpressionsList, context.getQueryContext());
}
@Override
@@ -108,7 +112,8 @@ public class ConcatDeviceAndBindSchemaForExpressionVisitor
}
}
List<Expression> reconstructTimeSeriesOperands =
- ExpressionUtils.reconstructTimeSeriesOperands(timeSeriesOperand,
nonViewActualPaths);
+ ExpressionUtils.reconstructTimeSeriesOperandsWithMemoryCheck(
+ timeSeriesOperand, nonViewActualPaths, context.getQueryContext());
// handle logical views
for (MeasurementPath measurementPath : viewPaths) {
Expression replacedExpression = transformViewPath(measurementPath,
context.getSchemaTree());
@@ -134,13 +139,20 @@ public class ConcatDeviceAndBindSchemaForExpressionVisitor
return Collections.singletonList(constantOperand);
}
- public static class Context {
+ public static class Context implements QueryContextProvider {
private final PartialPath devicePath;
private final ISchemaTree schemaTree;
- public Context(PartialPath devicePath, ISchemaTree schemaTree) {
+ private final MPPQueryContext queryContext;
+
+ public Context(
+ final PartialPath devicePath,
+ final ISchemaTree schemaTree,
+ final MPPQueryContext queryContext) {
this.devicePath = devicePath;
this.schemaTree = schemaTree;
+ Validate.notNull(queryContext, "QueryContext is null");
+ this.queryContext = queryContext;
}
public PartialPath getDevicePath() {
@@ -150,5 +162,10 @@ public class ConcatDeviceAndBindSchemaForExpressionVisitor
public ISchemaTree getSchemaTree() {
return schemaTree;
}
+
+ @Override
+ public MPPQueryContext getQueryContext() {
+ return queryContext;
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForPredicateVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForPredicateVisitor.java
index 7284568f34f..372c861481a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForPredicateVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForPredicateVisitor.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand;
@@ -30,13 +31,15 @@ import
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand;
import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
+import org.apache.commons.lang3.Validate;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.cartesianProduct;
-import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressions;
-import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperands;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressionsWithMemoryCheck;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperandsWithMemoryCheck;
import static
org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian.BindSchemaForExpressionVisitor.transformViewPath;
public class ConcatDeviceAndBindSchemaForPredicateVisitor
@@ -53,7 +56,8 @@ public class ConcatDeviceAndBindSchemaForPredicateVisitor
}
List<List<Expression>> childExpressionsList = new ArrayList<>();
cartesianProduct(extendedExpressions, childExpressionsList, 0, new
ArrayList<>());
- return reconstructFunctionExpressions(predicate, childExpressionsList);
+ return reconstructFunctionExpressionsWithMemoryCheck(
+ predicate, childExpressionsList, context.getQueryContext());
}
@Override
@@ -77,7 +81,8 @@ public class ConcatDeviceAndBindSchemaForPredicateVisitor
}
List<Expression> reconstructTimeSeriesOperands =
- reconstructTimeSeriesOperands(predicate, nonViewPathList);
+ reconstructTimeSeriesOperandsWithMemoryCheck(
+ predicate, nonViewPathList, context.getQueryContext());
for (MeasurementPath measurementPath : viewPathList) {
Expression replacedExpression = transformViewPath(measurementPath,
context.getSchemaTree());
if (!(replacedExpression instanceof TimeSeriesOperand)) {
@@ -102,15 +107,23 @@ public class ConcatDeviceAndBindSchemaForPredicateVisitor
return Collections.singletonList(constantOperand);
}
- public static class Context {
+ public static class Context implements QueryContextProvider {
private final PartialPath devicePath;
private final ISchemaTree schemaTree;
private final boolean isWhere;
- public Context(PartialPath devicePath, ISchemaTree schemaTree, boolean
isWhere) {
+ private final MPPQueryContext queryContext;
+
+ public Context(
+ final PartialPath devicePath,
+ final ISchemaTree schemaTree,
+ final boolean isWhere,
+ final MPPQueryContext queryContext) {
this.devicePath = devicePath;
this.schemaTree = schemaTree;
this.isWhere = isWhere;
+ Validate.notNull(queryContext, "QueryContext is null");
+ this.queryContext = queryContext;
}
public PartialPath getDevicePath() {
@@ -124,5 +137,10 @@ public class ConcatDeviceAndBindSchemaForPredicateVisitor
public boolean isWhere() {
return isWhere;
}
+
+ @Override
+ public MPPQueryContext getQueryContext() {
+ return queryContext;
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java
index 3f53190c00d..7c852083e2b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
@@ -28,6 +29,7 @@ import
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand;
import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.utils.constant.SqlConstant;
+import org.apache.commons.lang3.Validate;
import org.apache.tsfile.common.constant.TsFileConstant;
import java.util.ArrayList;
@@ -35,8 +37,8 @@ import java.util.Collections;
import java.util.List;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.cartesianProduct;
-import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressions;
-import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperands;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressionsWithMemoryCheck;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperandsWithMemoryCheck;
public class ConcatExpressionWithSuffixPathsVisitor
extends
CartesianProductVisitor<ConcatExpressionWithSuffixPathsVisitor.Context> {
@@ -60,7 +62,8 @@ public class ConcatExpressionWithSuffixPathsVisitor
List<List<Expression>> childExpressionsList = new ArrayList<>();
cartesianProduct(extendedExpressions, childExpressionsList, 0, new
ArrayList<>());
- return reconstructFunctionExpressions(functionExpression,
childExpressionsList);
+ return reconstructFunctionExpressionsWithMemoryCheck(
+ functionExpression, childExpressionsList, context.getQueryContext());
}
@Override
@@ -78,7 +81,8 @@ public class ConcatExpressionWithSuffixPathsVisitor
actualPaths.add(concatPath);
}
}
- return reconstructTimeSeriesOperands(timeSeriesOperand, actualPaths);
+ return reconstructTimeSeriesOperandsWithMemoryCheck(
+ timeSeriesOperand, actualPaths, context.getQueryContext());
}
@Override
@@ -92,13 +96,20 @@ public class ConcatExpressionWithSuffixPathsVisitor
return Collections.singletonList(constantOperand);
}
- public static class Context {
+ public static class Context implements QueryContextProvider {
private final List<PartialPath> prefixPaths;
private final PathPatternTree patternTree;
- public Context(List<PartialPath> prefixPaths, PathPatternTree patternTree)
{
+ private final MPPQueryContext queryContext;
+
+ public Context(
+ final List<PartialPath> prefixPaths,
+ final PathPatternTree patternTree,
+ final MPPQueryContext queryContext) {
this.prefixPaths = prefixPaths;
this.patternTree = patternTree;
+ Validate.notNull(queryContext, "QueryContext is null");
+ this.queryContext = queryContext;
}
public List<PartialPath> getPrefixPaths() {
@@ -108,5 +119,10 @@ public class ConcatExpressionWithSuffixPathsVisitor
public PathPatternTree getPatternTree() {
return patternTree;
}
+
+ @Override
+ public MPPQueryContext getQueryContext() {
+ return queryContext;
+ }
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughExceptionTest.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/QueryContextProvider.java
similarity index 61%
rename from
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughExceptionTest.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/QueryContextProvider.java
index dbf5046f675..2eb0ec0b4e7 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughExceptionTest.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/QueryContextProvider.java
@@ -17,21 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.exception;
+package org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian;
-import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class MemoryNotEnoughExceptionTest {
-
- @Test
- public void testMemoryNotEnoughExceptionStatusCode() {
- MemoryNotEnoughException e = new MemoryNotEnoughException("test");
- assertEquals(TSStatusCode.QUOTA_MEM_QUERY_NOT_ENOUGH.getStatusCode(),
e.getErrorCode());
- assertTrue(e.isUserException());
- }
+public interface QueryContextProvider {
+ MPPQueryContext getQueryContext();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
index cf4b3461955..30ae6faff02 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
@@ -60,6 +61,7 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.utils.MetaUtils;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.IMeasurementSchema;
@@ -86,8 +88,17 @@ public class AggregationPushDown implements PlanOptimizer {
|| cannotUseStatistics(queryStatement, analysis)) {
return plan;
}
- return plan.accept(
- new Rewriter(), new RewriterContext(analysis, context,
queryStatement.isAlignByDevice()));
+
+ RewriterContext rewriterContext =
+ new RewriterContext(analysis, context,
queryStatement.isAlignByDevice());
+ PlanNode node;
+ try {
+ node = plan.accept(new Rewriter(), rewriterContext);
+ } finally {
+ // release the last batch of memory
+ rewriterContext.releaseMemoryForFrontEndImmediately();
+ }
+ return node;
}
private boolean cannotUseStatistics(QueryStatement queryStatement, Analysis
analysis) {
@@ -288,12 +299,29 @@ public class AggregationPushDown implements PlanOptimizer
{
PlanNode resultNode = convergeWithTimeJoin(sourceNodeList,
node.getScanOrder(), context);
resultNode = planProject(resultNode, node, context);
+
+ // After pushing down the predicate, the original scan nodes are no
longer needed, we should
+ // release the memory that they occupied.
+ context.releaseMemoryForFrontEnd(getRamBytesUsedOfOldScanNodes(child));
return resultNode;
}
// cannot push down
return node;
}
+ private long getRamBytesUsedOfOldScanNodes(final PlanNode node) {
+ if (node == null) {
+ return 0L;
+ }
+ if (node instanceof SeriesScanSourceNode) {
+ SeriesScanSourceNode scanNode = (SeriesScanSourceNode) node;
+ return scanNode.ramBytesUsed();
+ } else if (node instanceof FullOuterTimeJoinNode) {
+ return
node.getChildren().stream().mapToLong(this::getRamBytesUsedOfOldScanNodes).sum();
+ }
+ return 0L;
+ }
+
private void createAggregationDescriptor(
FunctionExpression sourceExpression,
AggregationStep curStep,
@@ -407,19 +435,31 @@ public class AggregationPushDown implements PlanOptimizer
{
GroupByTimeParameter groupByTimeParameter,
RewriterContext context) {
if (selectPath instanceof MeasurementPath) { // non-aligned series
- return new SeriesAggregationScanNode(
- context.genPlanNodeId(),
- (MeasurementPath) selectPath,
- aggregationDescriptorList,
- scanOrder,
- groupByTimeParameter);
+ SeriesAggregationSourceNode node =
+ new SeriesAggregationScanNode(
+ context.genPlanNodeId(),
+ (MeasurementPath) selectPath,
+ aggregationDescriptorList,
+ scanOrder,
+ groupByTimeParameter);
+ context
+ .getContext()
+ .reserveMemoryForFrontEnd(
+
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(node));
+ return node;
} else if (selectPath instanceof AlignedPath) { // aligned series
- return new AlignedSeriesAggregationScanNode(
- context.genPlanNodeId(),
- (AlignedPath) selectPath,
- aggregationDescriptorList,
- scanOrder,
- groupByTimeParameter);
+ SeriesAggregationSourceNode node =
+ new AlignedSeriesAggregationScanNode(
+ context.genPlanNodeId(),
+ (AlignedPath) selectPath,
+ aggregationDescriptorList,
+ scanOrder,
+ groupByTimeParameter);
+ context
+ .getContext()
+ .reserveMemoryForFrontEnd(
+
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(node));
+ return node;
} else {
throw new IllegalArgumentException("unexpected path type");
}
@@ -449,14 +489,19 @@ public class AggregationPushDown implements PlanOptimizer
{
private static class RewriterContext {
+ private static final long RELEASE_BATCH_SIZE = 1024L * 1024L;
+
private final Analysis analysis;
private final MPPQueryContext context;
private final boolean isAlignByDevice;
private String curDevice;
+ private long bytesToBeReleased = 0;
+
public RewriterContext(Analysis analysis, MPPQueryContext context, boolean
isAlignByDevice) {
this.analysis = analysis;
+ Validate.notNull(context, "Query context cannot be null.");
this.context = context;
this.isAlignByDevice = isAlignByDevice;
}
@@ -473,11 +518,29 @@ public class AggregationPushDown implements PlanOptimizer
{
this.curDevice = curDevice;
}
+ public MPPQueryContext getContext() {
+ return context;
+ }
+
public Set<Expression> getAggregationExpressions() {
if (isAlignByDevice) {
return analysis.getDeviceToAggregationExpressions().get(curDevice);
}
return analysis.getAggregationExpressions();
}
+
+ public void releaseMemoryForFrontEnd(final long bytes) {
+ bytesToBeReleased += bytes;
+ if (bytesToBeReleased >= RELEASE_BATCH_SIZE) {
+ releaseMemoryForFrontEndImmediately();
+ }
+ }
+
+ public void releaseMemoryForFrontEndImmediately() {
+ if (bytesToBeReleased > 0) {
+ context.releaseMemoryForFrontEnd(bytesToBeReleased);
+ bytesToBeReleased = 0;
+ }
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index 14fd9be4b05..6a8ba336588 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -213,7 +213,27 @@ public class LocalExecutionPlanner {
}
}
- public synchronized void releaseToFreeMemoryForOperators(long memoryInBytes)
{
+ public synchronized void reserveMemoryForQueryFrontEnd(
+ final long memoryInBytes, final long reservedBytes, final String
queryId) {
+ if (memoryInBytes > freeMemoryForOperators) {
+ throw new MemoryNotEnoughException(
+ String.format(
+ "There is not enough memory for planning-stage of Query %s, "
+ + "current remaining free memory is %dB, "
+ + "estimated memory usage is %dB, reserved memory for FE of
this query in total is %dB",
+ queryId, freeMemoryForOperators, memoryInBytes, reservedBytes));
+ } else {
+ freeMemoryForOperators -= memoryInBytes;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "[ConsumeMemory] consume: {}, current remaining memory: {}",
+ memoryInBytes,
+ freeMemoryForOperators);
+ }
+ }
+ }
+
+ public synchronized void releaseToFreeMemoryForOperators(final long
memoryInBytes) {
freeMemoryForOperators += memoryInBytes;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index b1786c85112..9559b29e976 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.commons.schema.filter.SchemaFilter;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.TimeseriesSchemaInfo;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import
org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
@@ -82,6 +83,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeri
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesSourceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
@@ -138,6 +140,7 @@ public class LogicalPlanBuilder {
public LogicalPlanBuilder(Analysis analysis, MPPQueryContext context) {
this.analysis = analysis;
+ Validate.notNull(context, "Query context cannot be null");
this.context = context;
}
@@ -195,27 +198,26 @@ public class LogicalPlanBuilder {
for (PartialPath path : groupedPaths) {
if (path instanceof MeasurementPath) {
// non-aligned series
- SeriesScanNode seriesScanNode =
- new SeriesScanNode(
- context.getQueryId().genPlanNodeId(),
- (MeasurementPath) path,
- scanOrder,
- limit,
- offset,
- null);
- sourceNodeList.add(seriesScanNode);
+ sourceNodeList.add(
+ reserveMemoryForSeriesSourceNode(
+ new SeriesScanNode(
+ context.getQueryId().genPlanNodeId(),
+ (MeasurementPath) path,
+ scanOrder,
+ limit,
+ offset,
+ null)));
} else if (path instanceof AlignedPath) {
- // aligned series
- AlignedSeriesScanNode alignedSeriesScanNode =
- new AlignedSeriesScanNode(
- context.getQueryId().genPlanNodeId(),
- (AlignedPath) path,
- scanOrder,
- limit,
- offset,
- null,
- lastLevelUseWildcard);
- sourceNodeList.add(alignedSeriesScanNode);
+ sourceNodeList.add(
+ reserveMemoryForSeriesSourceNode(
+ new AlignedSeriesScanNode(
+ context.getQueryId().genPlanNodeId(),
+ (AlignedPath) path,
+ scanOrder,
+ limit,
+ offset,
+ null,
+ lastLevelUseWildcard)));
} else {
throw new IllegalArgumentException("Unexpected path type");
}
@@ -274,14 +276,16 @@ public class LogicalPlanBuilder {
if (selectedPath.isUnderAlignedEntity()) { // aligned series
sourceNodeList.add(
- new AlignedLastQueryScanNode(
- context.getQueryId().genPlanNodeId(),
- new AlignedPath(selectedPath),
- outputViewPath));
+ reserveMemoryForSeriesSourceNode(
+ new AlignedLastQueryScanNode(
+ context.getQueryId().genPlanNodeId(),
+ new AlignedPath(selectedPath),
+ outputViewPath)));
} else { // non-aligned series
sourceNodeList.add(
- new LastQueryScanNode(
- context.getQueryId().genPlanNodeId(), selectedPath,
outputViewPath));
+ reserveMemoryForSeriesSourceNode(
+ new LastQueryScanNode(
+ context.getQueryId().genPlanNodeId(), selectedPath,
outputViewPath)));
}
}
} else {
@@ -296,15 +300,18 @@ public class LogicalPlanBuilder {
alignedPath.addMeasurement(measurementPath);
}
sourceNodeList.add(
- new AlignedLastQueryScanNode(
- context.getQueryId().genPlanNodeId(), alignedPath, null));
+ reserveMemoryForSeriesSourceNode(
+ new AlignedLastQueryScanNode(
+ context.getQueryId().genPlanNodeId(), alignedPath,
null)));
} else {
// non-aligned series
for (Expression sourceExpression :
measurementToExpressionsOfDevice.values()) {
MeasurementPath selectedPath =
(MeasurementPath) ((TimeSeriesOperand)
sourceExpression).getPath();
sourceNodeList.add(
- new LastQueryScanNode(context.getQueryId().genPlanNodeId(),
selectedPath, null));
+ reserveMemoryForSeriesSourceNode(
+ new LastQueryScanNode(
+ context.getQueryId().genPlanNodeId(), selectedPath,
null)));
}
}
}
@@ -1356,4 +1363,15 @@ public class LogicalPlanBuilder {
this.root = timeseriesRegionScanNode;
return this;
}
+
+ /**
+ * There could be a lot of SeriesSourceNodes if there are too many series
involved in one query.
+ * We need to check the memory used by SeriesSourceNodes.(Number of other
PlanNodes are rather
+ * small compared to SourceNodes and could be safely ignored for now.)
+ */
+ private PlanNode reserveMemoryForSeriesSourceNode(final SeriesSourceNode
sourceNode) {
+ this.context.reserveMemoryForFrontEnd(
+
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceNode));
+ return sourceNode;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
index 6c49e4478b8..c0f66e46c6c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.queryengine.plan.planner.distribution;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.commons.lang3.Validate;
import org.apache.tsfile.read.filter.basic.Filter;
import java.util.List;
@@ -43,6 +44,7 @@ public class DistributionPlanContext {
protected DistributionPlanContext(MPPQueryContext queryContext) {
this.isRoot = true;
+ Validate.notNull(queryContext, "Query context cannot be null");
this.queryContext = queryContext;
}
@@ -90,4 +92,8 @@ public class DistributionPlanContext {
public boolean isOneSeriesInMultiRegion() {
return oneSeriesInMultiRegion;
}
+
+ public MPPQueryContext getQueryContext() {
+ return queryContext;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 3ee8167c662..cae6acfaf63 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
@@ -799,6 +800,10 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
SeriesSourceNode split = (SeriesSourceNode) node.clone();
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
split.setRegionReplicaSet(dataRegion);
+ context
+ .getQueryContext()
+ .reserveMemoryForFrontEnd(
+
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(split));
ret.add(split);
}
return ret;
@@ -858,6 +863,10 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
split.setAggregationDescriptorList(leafAggDescriptorList);
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
split.setRegionReplicaSet(dataRegion);
+ context
+ .getQueryContext()
+ .reserveMemoryForFrontEnd(
+
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(split));
aggregationNode.addChild(split);
}
return Collections.singletonList(aggregationNode);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
index 764aaf627f7..c00e7ed81ec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -30,6 +31,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.eclipse.jetty.util.StringUtil;
@@ -43,6 +45,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
public class AlignedLastQueryScanNode extends LastSeriesSourceNode {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(AlignedLastQueryScanNode.class);
+
// The path of the target series which will be scanned.
private final AlignedPath seriesPath;
@@ -229,4 +234,12 @@ public class AlignedLastQueryScanNode extends
LastSeriesSourceNode {
public PartialPath getPartitionPath() {
return getSeriesPath();
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id)
+ + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(seriesPath)
+ + RamUsageEstimator.sizeOf(outputViewPath);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
index 2b69cdb0bc4..68a284c018d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -35,6 +36,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimePa
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import javax.annotation.Nullable;
@@ -47,6 +49,8 @@ import java.util.List;
import java.util.Objects;
public class AlignedSeriesAggregationScanNode extends
SeriesAggregationSourceNode {
+ private static final long INSTANCE_SIZE =
+
RamUsageEstimator.shallowSizeOfInstance(AlignedSeriesAggregationScanNode.class);
// The paths of the target series which will be aggregated.
private AlignedPath alignedPath;
@@ -294,4 +298,11 @@ public class AlignedSeriesAggregationScanNode extends
SeriesAggregationSourceNod
this.getAggregationDescriptorList(),
PlanNodeUtil.printRegionReplicaSet(getRegionReplicaSet()));
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id)
+ + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(alignedPath);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
index abf9a70a09f..327899e4bf6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -33,6 +34,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import javax.annotation.Nullable;
@@ -46,6 +48,9 @@ import java.util.Objects;
public class AlignedSeriesScanNode extends SeriesScanSourceNode {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(AlignedSeriesScanNode.class);
+
// The paths of the target series which will be scanned.
private final AlignedPath alignedPath;
@@ -260,4 +265,11 @@ public class AlignedSeriesScanNode extends
SeriesScanSourceNode {
public PartialPath getPartitionPath() {
return getAlignedPath();
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id)
+ + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(alignedPath);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java
index 6c9cbaa1adb..8d4f8291a62 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -30,6 +31,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.eclipse.jetty.util.StringUtil;
@@ -42,6 +44,9 @@ import java.util.concurrent.atomic.AtomicInteger;
public class LastQueryScanNode extends LastSeriesSourceNode {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(LastQueryScanNode.class);
+
public static final List<String> LAST_QUERY_HEADER_COLUMNS =
ImmutableList.of(
ColumnHeaderConstant.TIMESERIES,
@@ -229,4 +234,12 @@ public class LastQueryScanNode extends
LastSeriesSourceNode {
return outputViewPath;
}
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id)
+ + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(seriesPath)
+ + RamUsageEstimator.sizeOf(outputViewPath);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java
index 8c3bcb6ec61..01e801cad22 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -34,6 +35,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimePa
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import javax.annotation.Nullable;
@@ -62,6 +64,9 @@ import java.util.Objects;
*/
public class SeriesAggregationScanNode extends SeriesAggregationSourceNode {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(SeriesAggregationScanNode.class);
+
// The path of the target series which will be aggregated.
private final MeasurementPath seriesPath;
@@ -297,4 +302,11 @@ public class SeriesAggregationScanNode extends
SeriesAggregationSourceNode {
this.getAggregationDescriptorList(),
PlanNodeUtil.printRegionReplicaSet(this.getRegionReplicaSet()));
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id)
+ + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(seriesPath);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java
index 3bd1087f624..587d1ac248c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -31,6 +32,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import javax.annotation.Nullable;
@@ -50,6 +52,9 @@ import java.util.Objects;
*/
public class SeriesScanNode extends SeriesScanSourceNode {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(SeriesScanNode.class);
+
// The path of the target series which will be scanned.
private final MeasurementPath seriesPath;
@@ -195,4 +200,11 @@ public class SeriesScanNode extends SeriesScanSourceNode {
public PartialPath getPartitionPath() {
return getSeriesPath();
}
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id)
+ + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(seriesPath);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesSourceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesSourceNode.java
index d5b9c64f321..c2e052e76b7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesSourceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesSourceNode.java
@@ -23,7 +23,9 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
-public abstract class SeriesSourceNode extends SourceNode {
+import org.apache.tsfile.utils.Accountable;
+
+public abstract class SeriesSourceNode extends SourceNode implements
Accountable {
protected SeriesSourceNode(PlanNodeId id) {
super(id);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 837e4b7ef03..e698ba8487e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.protocol.thrift.OperationType;
+import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -147,6 +148,8 @@ public class ErrorHandlingUtils {
((IoTDBException) t.getCause()).getErrorCode(),
rootCause.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SEMANTIC_ERROR,
rootCause.getMessage());
+ } else if (t instanceof MemoryNotEnoughException) {
+ return RpcUtils.getStatus(TSStatusCode.QUOTA_MEM_QUERY_NOT_ENOUGH,
rootCause.getMessage());
}
if (t instanceof RuntimeException && rootCause instanceof IoTDBException) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzerTest.java
index a4f36c14dd0..1cd47c695d3 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzerTest.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.queryengine.plan.analyze;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.junit.Test;
@@ -55,7 +57,8 @@ public class ExpressionAnalyzerTest {
and(gt(timeSeries("s1"), intValue("1")), gt(timeSeries("s2"),
intValue("1"))),
prefixPaths,
fakeSchemaTree,
- true));
+ true,
+ new MPPQueryContext(new QueryId("test"))));
assertEquals(
Arrays.asList(
@@ -79,6 +82,7 @@ public class ExpressionAnalyzerTest {
count(and(gt(timeSeries("s1"), intValue("1")),
gt(timeSeries("s2"), intValue("1")))),
prefixPaths,
fakeSchemaTree,
- true));
+ true,
+ new MPPQueryContext(new QueryId("test"))));
}
}