This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/AggOpMemoryControl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5e9192cdf2041fe8db4af599dcf25973906abf24 Author: liuminghui233 <[email protected]> AuthorDate: Mon Aug 15 00:16:00 2022 +0800 finish memory calculate for AggregationOperator --- .../db/mpp/execution/operator/AggregationUtil.java | 2 +- .../operator/process/AggregationOperator.java | 36 ++++++++++++---------- .../db/mpp/plan/planner/OperatorTreeGenerator.java | 30 +++++++++++++++--- .../operator/AggregationOperatorTest.java | 5 ++- 4 files changed, 48 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java index 1896fc3af4..564f21245e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java @@ -177,7 +177,7 @@ public class AggregationUtil { } public static long calculateMaxAggregationResultSize( - List<AggregationDescriptor> aggregationDescriptors, + List<? extends AggregationDescriptor> aggregationDescriptors, ITimeRangeIterator timeRangeIterator, boolean isGroupByQuery, TypeProvider typeProvider) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java index 92da980d1e..7efe992cdd 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java @@ -22,7 +22,6 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator; import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; -import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -37,8 +36,6 @@ import java.util.concurrent.TimeUnit; import static com.google.common.util.concurrent.Futures.successfulAsList; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult; -import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator; -import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; /** * AggregationOperator can process the situation: aggregation of intermediate aggregate result, it @@ -63,16 +60,20 @@ public class AggregationOperator implements ProcessOperator { // using for building result tsBlock private final TsBlockBuilder resultTsBlockBuilder; + private final long maxRetainedSize; + private final long childrenRetainedSize; + private final long maxReturnSize; + public AggregationOperator( OperatorContext operatorContext, List<Aggregator> aggregators, + ITimeRangeIterator timeRangeIterator, List<Operator> children, - boolean ascending, - GroupByTimeParameter groupByTimeParameter, - boolean outputPartialTimeWindow) { + long maxReturnSize) { this.operatorContext = operatorContext; this.children = children; this.aggregators = aggregators; + this.timeRangeIterator = timeRangeIterator; this.inputOperatorsCount = children.size(); this.inputTsBlocks = new TsBlock[inputOperatorsCount]; @@ -81,14 +82,16 @@ public class AggregationOperator implements ProcessOperator { canCallNext[i] = false; } - this.timeRangeIterator = - initTimeRangeIterator(groupByTimeParameter, ascending, outputPartialTimeWindow); - List<TSDataType> dataTypes = new ArrayList<>(); for (Aggregator aggregator : aggregators) { dataTypes.addAll(Arrays.asList(aggregator.getOutputType())); } this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes); + + this.maxRetainedSize = children.stream().mapToLong(Operator::calculateMaxReturnSize).sum(); + this.childrenRetainedSize = + children.stream().mapToLong(Operator::calculateRetainedSizeAfterCallingNext).sum(); + this.maxReturnSize = maxReturnSize; } @Override @@ -98,18 +101,17 @@ public class AggregationOperator implements ProcessOperator { @Override public long calculateMaxPeekMemory() { - long maxPeekMemory = calculateMaxReturnSize(); - long childrenMaxPeekMemory = 0; - for (Operator child : children) { - maxPeekMemory += child.calculateMaxReturnSize(); - childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory()); - } - return Math.max(maxPeekMemory, childrenMaxPeekMemory); + return maxReturnSize + maxRetainedSize + childrenRetainedSize; } @Override public long calculateMaxReturnSize() { - return (1L + inputOperatorsCount) * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + return maxReturnSize; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return maxRetainedSize + childrenRetainedSize; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index 50d65c2bbb..0978a006f5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -1045,7 +1045,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP boolean ascending = node.getScanOrder() == Ordering.ASC; List<Aggregator> aggregators = new ArrayList<>(); Map<String, List<InputLocation>> layout = makeLayout(node); - for (GroupByLevelDescriptor descriptor : node.getGroupByLevelDescriptors()) { + List<GroupByLevelDescriptor> aggregationDescriptors = node.getGroupByLevelDescriptors(); + for (GroupByLevelDescriptor descriptor : aggregationDescriptors) { List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout); TSDataType seriesDataType = context @@ -1067,9 +1068,19 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP node.getPlanNodeId(), AggregationOperator.class.getSimpleName()); + GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter(); + ITimeRangeIterator timeRangeIterator = + initTimeRangeIterator(groupByTimeParameter, ascending, false); + long maxReturnSize = + calculateMaxAggregationResultSize( + aggregationDescriptors, + timeRangeIterator, + groupByTimeParameter != null, + context.getTypeProvider()); + context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size()); return new AggregationOperator( - operatorContext, aggregators, children, ascending, node.getGroupByTimeParameter(), false); + operatorContext, aggregators, timeRangeIterator, children, maxReturnSize); } @Override @@ -1183,6 +1194,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP inputLocationList)); } boolean inputRaw = node.getAggregationDescriptorList().get(0).getStep().isInputRaw(); + GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter(); + if (inputRaw) { checkArgument(children.size() == 1, "rawDataAggregateOperator can only accept one input"); OperatorContext operatorContext = @@ -1194,7 +1207,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP RawDataAggregationOperator.class.getSimpleName()); context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size()); - GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter(); ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, true); long maxReturnSize = @@ -1219,9 +1231,19 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), AggregationOperator.class.getSimpleName()); + + ITimeRangeIterator timeRangeIterator = + initTimeRangeIterator(groupByTimeParameter, ascending, true); + long maxReturnSize = + calculateMaxAggregationResultSize( + aggregationDescriptors, + timeRangeIterator, + groupByTimeParameter != null, + context.getTypeProvider()); + context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size()); return new AggregationOperator( - operatorContext, aggregators, children, ascending, node.getGroupByTimeParameter(), true); + operatorContext, aggregators, timeRangeIterator, children, maxReturnSize); } } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java index b3e80f1b82..85e65849a6 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java @@ -374,9 +374,8 @@ public class AggregationOperatorTest { return new AggregationOperator( fragmentInstanceContext.getOperatorContexts().get(2), finalAggregators, + initTimeRangeIterator(groupByTimeParameter, true, true), children, - true, - groupByTimeParameter, - true); + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES); } }
