This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8e7c5f6a49f2285e711509d6247884da45db8d28
Author: Minghui Liu <[email protected]>
AuthorDate: Wed Aug 10 11:41:04 2022 +0800

    memory control for AggregationOperator
---
 .../execution/operator/process/AggregationOperator.java | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 8cf4aef8b0..92da980d1e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
 import static com.google.common.util.concurrent.Futures.successfulAsList;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 /**
  * AggregationOperator can process the situation: aggregation of intermediate 
aggregate result, it
@@ -95,6 +96,22 @@ public class AggregationOperator implements ProcessOperator {
     return operatorContext;
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = calculateMaxReturnSize();
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      maxPeekMemory += child.calculateMaxReturnSize();
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, 
child.calculateMaxPeekMemory());
+    }
+    return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return (1L + inputOperatorsCount) * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
   @Override
   public ListenableFuture<?> isBlocked() {
     List<ListenableFuture<?>> listenableFutures = new ArrayList<>();

Reply via email to