This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/MemoryControl by this push:
     new fb2f684107 [To MemoryControl] [IOTDB-4081] Implemation of operators 
related to aggregation for memory control (#6993)
fb2f684107 is described below

commit fb2f684107a54357d4a3d3bd1156289572f5f524
Author: liuminghui233 <[email protected]>
AuthorDate: Mon Aug 15 10:57:54 2022 +0800

    [To MemoryControl] [IOTDB-4081] Implemation of operators related to 
aggregation for memory control (#6993)
---
 .../iotdb/db/metadata/LocalSchemaProcessor.java    |   6 +-
 .../db/metadata/rescon/SchemaResourceManager.java  |   4 +-
 ...tatistics.java => SchemaStatisticsManager.java} |  12 +-
 .../schemaregion/SchemaRegionMemoryImpl.java       |  12 +-
 .../schemaregion/SchemaRegionSchemaFileImpl.java   |  12 +-
 .../timerangeiterator/AggrWindowIterator.java      |  23 ++
 .../timerangeiterator/ITimeRangeIterator.java      |   2 +
 .../timerangeiterator/PreAggrWindowIterator.java   |  17 +
 .../PreAggrWindowWithNaturalMonthIterator.java     |  16 +
 .../SingleTimeWindowIterator.java                  |   5 +
 .../db/mpp/execution/operator/AggregationUtil.java |  81 ++++
 .../operator/process/AggregationOperator.java      |  35 +-
 .../process/RawDataAggregationOperator.java        |   7 +-
 .../process/SingleInputAggregationOperator.java    |  31 +-
 .../process/SlidingWindowAggregationOperator.java  |   6 +-
 .../AbstractSeriesAggregationScanOperator.java     |  31 +-
 .../AlignedSeriesAggregationScanOperator.java      |   9 +-
 .../source/SeriesAggregationScanOperator.java      |   9 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 198 ++++++---
 .../iotdb/db/mpp/statistics/StatisticsManager.java |  46 +++
 .../iotdb/db/mpp/statistics/TimeseriesStats.java   |  24 ++
 .../iotdb/db/utils/datastructure/TimeSelector.java |   5 +
 .../db/mpp/aggregation/TimeRangeIteratorTest.java  |   2 +
 .../operator/AggregationOperatorTest.java          |  15 +-
 .../AlignedSeriesAggregationScanOperatorTest.java  |  11 +-
 .../execution/operator/LastQueryOperatorTest.java  |  18 +-
 .../operator/LastQuerySortOperatorTest.java        |  18 +-
 .../mpp/execution/operator/OperatorMemoryTest.java | 460 +++++++++++++++++++++
 .../operator/RawDataAggregationOperatorTest.java   |   5 +-
 .../SeriesAggregationScanOperatorTest.java         |   6 +-
 .../SlidingWindowAggregationOperatorTest.java      |  10 +-
 .../operator/UpdateLastCacheOperatorTest.java      |   6 +-
 32 files changed, 1018 insertions(+), 124 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java 
b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
index 32518a6239..fafd13ddd9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.db.metadata.mnode.IMNode;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.metadata.template.Template;
@@ -466,7 +466,7 @@ public class LocalSchemaProcessor {
     // todo this is for test assistance, refactor this to support massive 
timeseries
     if (pathPattern.getFullPath().equals("root.**")
         && TemplateManager.getInstance().getAllTemplateName().isEmpty()) {
-      return (int) TimeseriesStatistics.getInstance().getTotalSeriesNumber();
+      return (int) 
SchemaStatisticsManager.getInstance().getTotalSeriesNumber();
     }
     int count = 0;
     for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, 
isPrefixMatch)) {
@@ -1380,7 +1380,7 @@ public class LocalSchemaProcessor {
 
   @TestOnly
   public long getTotalSeriesNumber() {
-    return TimeseriesStatistics.getInstance().getTotalSeriesNumber();
+    return SchemaStatisticsManager.getInstance().getTotalSeriesNumber();
   }
 
   /**
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
index 0fe18ac955..c5df8ff9a7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
@@ -30,7 +30,7 @@ public class SchemaResourceManager {
   private SchemaResourceManager() {}
 
   public static void initSchemaResource() {
-    TimeseriesStatistics.getInstance().init();
+    SchemaStatisticsManager.getInstance().init();
     MemoryStatistics.getInstance().init();
     if (IoTDBDescriptor.getInstance()
         .getConfig()
@@ -41,7 +41,7 @@ public class SchemaResourceManager {
   }
 
   public static void clearSchemaResource() {
-    TimeseriesStatistics.getInstance().clear();
+    SchemaStatisticsManager.getInstance().clear();
     MemoryStatistics.getInstance().clear();
     if (IoTDBDescriptor.getInstance()
         .getConfig()
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
similarity index 86%
rename from 
server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
rename to 
server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
index f8a1f21a20..6d63cdfe85 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
@@ -25,22 +25,22 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 
 import java.util.concurrent.atomic.AtomicLong;
 
-public class TimeseriesStatistics {
+public class SchemaStatisticsManager {
 
   private final AtomicLong totalSeriesNumber = new AtomicLong();
 
-  private static class TimeseriesStatisticsHolder {
+  private static class SchemaStatisticsHolder {
 
-    private TimeseriesStatisticsHolder() {
+    private SchemaStatisticsHolder() {
       // allowed to do nothing
     }
 
-    private static final TimeseriesStatistics INSTANCE = new 
TimeseriesStatistics();
+    private static final SchemaStatisticsManager INSTANCE = new 
SchemaStatisticsManager();
   }
 
   /** we should not use this function in other place, but only in IoTDB class 
*/
-  public static TimeseriesStatistics getInstance() {
-    return TimeseriesStatisticsHolder.INSTANCE;
+  public static SchemaStatisticsManager getInstance() {
+    return SchemaStatisticsHolder.INSTANCE;
   }
 
   public void init() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 452f2c4a5b..2e1ff4801b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -52,7 +52,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGMemoryImpl;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
 import org.apache.iotdb.db.metadata.tag.TagManager;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.template.TemplateManager;
@@ -169,7 +169,7 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
   private boolean usingMLog = true;
   private MLogWriter logWriter;
 
-  private TimeseriesStatistics timeseriesStatistics = 
TimeseriesStatistics.getInstance();
+  private SchemaStatisticsManager schemaStatisticsManager = 
SchemaStatisticsManager.getInstance();
   private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
 
   private final IStorageGroupMNode storageGroupMNode;
@@ -455,7 +455,7 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
     // collect all the LeafMNode in this schema region
     List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
 
-    timeseriesStatistics.deleteTimeseries(leafMNodes.size());
+    schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
 
     // drop triggers with no exceptions
     TriggerEngine.drop(leafMNodes);
@@ -602,7 +602,7 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
       mNodeCache.invalidate(path.getDevicePath());
 
       // update statistics and schemaDataTypeNumMap
-      timeseriesStatistics.addTimeseries(1);
+      schemaStatisticsManager.addTimeseries(1);
 
       // update tag index
       if (offset != -1 && isRecovering) {
@@ -719,7 +719,7 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
       mNodeCache.invalidate(prefixPath);
 
       // update statistics and schemaDataTypeNumMap
-      timeseriesStatistics.addTimeseries(plan.getMeasurements().size());
+      schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
 
       List<Long> tagOffsets = plan.getTagOffsets();
       for (int i = 0; i < measurements.size(); i++) {
@@ -861,7 +861,7 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
 
     mNodeCache.invalidate(node.getPartialPath());
 
-    timeseriesStatistics.deleteTimeseries(1);
+    schemaStatisticsManager.deleteTimeseries(1);
     return storageGroupPath;
   }
   // endregion
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 49fd1951b6..066b69902e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -50,7 +50,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGCachedImpl;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
 import org.apache.iotdb.db.metadata.tag.TagManager;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.template.TemplateManager;
@@ -163,7 +163,7 @@ public class SchemaRegionSchemaFileImpl implements 
ISchemaRegion {
   private File logFile;
   private MLogWriter logWriter;
 
-  private TimeseriesStatistics timeseriesStatistics = 
TimeseriesStatistics.getInstance();
+  private SchemaStatisticsManager schemaStatisticsManager = 
SchemaStatisticsManager.getInstance();
   private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
 
   private final IStorageGroupMNode storageGroupMNode;
@@ -414,7 +414,7 @@ public class SchemaRegionSchemaFileImpl implements 
ISchemaRegion {
     // collect all the LeafMNode in this schema region
     List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
 
-    timeseriesStatistics.deleteTimeseries(leafMNodes.size());
+    schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
 
     // drop triggers with no exceptions
     TriggerEngine.drop(leafMNodes);
@@ -496,7 +496,7 @@ public class SchemaRegionSchemaFileImpl implements 
ISchemaRegion {
         mNodeCache.invalidate(path.getDevicePath());
 
         // update statistics and schemaDataTypeNumMap
-        timeseriesStatistics.addTimeseries(1);
+        schemaStatisticsManager.addTimeseries(1);
 
         // update tag index
         if (offset != -1 && isRecovering) {
@@ -638,7 +638,7 @@ public class SchemaRegionSchemaFileImpl implements 
ISchemaRegion {
         mNodeCache.invalidate(prefixPath);
 
         // update statistics and schemaDataTypeNumMap
-        timeseriesStatistics.addTimeseries(plan.getMeasurements().size());
+        schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
 
         List<Long> tagOffsets = plan.getTagOffsets();
         for (int i = 0; i < measurements.size(); i++) {
@@ -786,7 +786,7 @@ public class SchemaRegionSchemaFileImpl implements 
ISchemaRegion {
 
     mNodeCache.invalidate(node.getPartialPath());
 
-    timeseriesStatistics.deleteTimeseries(1);
+    schemaStatisticsManager.deleteTimeseries(1);
     return storageGroupPath;
   }
   // endregion
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
index f27c56223c..50f50e4175 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
@@ -176,4 +176,27 @@ public class AggrWindowIterator implements 
ITimeRangeIterator {
   public long currentOutputTime() {
     return leftCRightO ? curTimeRange.getMin() : curTimeRange.getMax();
   }
+
+  @Override
+  public long getTotalIntervalNum() {
+    long queryRange = endTime - startTime;
+    long intervalNum;
+
+    if (isSlidingStepByMonth) {
+      intervalNum = (long) Math.ceil(queryRange / (double) (slidingStep * 
MS_TO_MONTH));
+      long retStartTime = DatetimeUtils.calcIntervalByMonth(startTime, 
intervalNum * slidingStep);
+      while (retStartTime > endTime) {
+        intervalNum -= 1;
+        retStartTime = DatetimeUtils.calcIntervalByMonth(startTime, 
intervalNum * slidingStep);
+      }
+    } else {
+      intervalNum = (long) Math.ceil(queryRange / (double) slidingStep);
+    }
+    return intervalNum;
+  }
+
+  public void reset() {
+    curTimeRange = null;
+    hasCachedTimeRange = false;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
index ccc999e810..47a6961fd7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
@@ -55,4 +55,6 @@ public interface ITimeRangeIterator {
    * @return minTime if leftCloseRightOpen, else maxTime.
    */
   long currentOutputTime();
+
+  long getTotalIntervalNum();
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowIterator.java
index f7f6297ddb..c0b8c3b000 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowIterator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowIterator.java
@@ -173,4 +173,21 @@ public class PreAggrWindowIterator implements 
ITimeRangeIterator {
   public long currentOutputTime() {
     return leftCRightO ? curTimeRange.getMin() : curTimeRange.getMax();
   }
+
+  @Override
+  public long getTotalIntervalNum() {
+    long queryRange = endTime - startTime;
+    if (slidingStep >= interval || interval % slidingStep == 0) {
+      return (long) Math.ceil(queryRange / (double) slidingStep);
+    }
+
+    long interval1 = interval % slidingStep, interval2 = slidingStep - 
interval % slidingStep;
+    long intervalNum = Math.floorDiv(queryRange, interval1 + interval2);
+    long tmpStartTime = startTime + intervalNum * (interval1 + interval2);
+    if (tmpStartTime + interval1 > endTime) {
+      return intervalNum * 2 + 1;
+    } else {
+      return intervalNum * 2 + 2;
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
index d8eede01ef..87f6cb8630 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
@@ -147,4 +147,20 @@ public class PreAggrWindowWithNaturalMonthIterator 
implements ITimeRangeIterator
   public long currentOutputTime() {
     return leftCRightO ? curTimeRange.getMin() : curTimeRange.getMax();
   }
+
+  @Override
+  public long getTotalIntervalNum() {
+    long tmpInterval = 0;
+    while (hasNextTimeRange()) {
+      tmpInterval++;
+      nextTimeRange();
+    }
+
+    curTimeRange = null;
+    timeBoundaryHeap.clear();
+    aggrWindowIterator.reset();
+    initHeap();
+
+    return tmpInterval;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
index b998557604..4802267afa 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
@@ -71,4 +71,9 @@ public class SingleTimeWindowIterator implements 
ITimeRangeIterator {
   public long currentOutputTime() {
     return curTimeRange.getMin();
   }
+
+  @Override
+  public long getTotalIntervalNum() {
+    return 1;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
index 5465800553..564f21245e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
@@ -19,20 +19,37 @@
 
 package org.apache.iotdb.db.mpp.execution.operator;
 
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import 
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import 
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.SingleTimeWindowIterator;
 import 
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.TimeRangeIteratorFactory;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.statistics.StatisticsManager;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange;
 
 public class AggregationUtil {
@@ -158,4 +175,68 @@ public class AggregationUtil {
     }
     return true;
   }
+
+  public static long calculateMaxAggregationResultSize(
+      List<? extends AggregationDescriptor> aggregationDescriptors,
+      ITimeRangeIterator timeRangeIterator,
+      boolean isGroupByQuery,
+      TypeProvider typeProvider) {
+    long valueColumnsSizePerLine = 0;
+    for (AggregationDescriptor descriptor : aggregationDescriptors) {
+      List<TSDataType> outPutDataTypes =
+          descriptor.getOutputColumnNames().stream()
+              .map(typeProvider::getType)
+              .collect(Collectors.toList());
+      for (TSDataType tsDataType : outPutDataTypes) {
+        checkArgument(
+            descriptor.getInputExpressions().get(0) instanceof 
TimeSeriesOperand,
+            "The input of aggregate function must be the original time 
series.");
+        PartialPath inputSeriesPath =
+            ((TimeSeriesOperand) 
descriptor.getInputExpressions().get(0)).getPath();
+        valueColumnsSizePerLine += getOutputColumnSizePerLine(tsDataType, 
inputSeriesPath);
+      }
+    }
+
+    return isGroupByQuery
+        ? Math.min(
+            DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+            Math.min(
+                    
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(),
+                    timeRangeIterator.getTotalIntervalNum())
+                * (TimeColumn.SIZE_IN_BYTES_PER_POSITION + 
valueColumnsSizePerLine))
+        : valueColumnsSizePerLine;
+  }
+
+  public static long calculateMaxAggregationResultSizeForLastQuery(
+      List<Aggregator> aggregators, PartialPath inputSeriesPath) {
+    long valueColumnsSizePerLine = 0;
+    List<TSDataType> outPutDataTypes =
+        aggregators.stream()
+            .flatMap(aggregator -> Arrays.stream(aggregator.getOutputType()))
+            .collect(Collectors.toList());
+    for (TSDataType tsDataType : outPutDataTypes) {
+      valueColumnsSizePerLine += getOutputColumnSizePerLine(tsDataType, 
inputSeriesPath);
+    }
+    return valueColumnsSizePerLine;
+  }
+
+  private static long getOutputColumnSizePerLine(
+      TSDataType tsDataType, PartialPath inputSeriesPath) {
+    switch (tsDataType) {
+      case INT32:
+        return IntColumn.SIZE_IN_BYTES_PER_POSITION;
+      case INT64:
+        return LongColumn.SIZE_IN_BYTES_PER_POSITION;
+      case FLOAT:
+        return FloatColumn.SIZE_IN_BYTES_PER_POSITION;
+      case DOUBLE:
+        return DoubleColumn.SIZE_IN_BYTES_PER_POSITION;
+      case BOOLEAN:
+        return BooleanColumn.SIZE_IN_BYTES_PER_POSITION;
+      case TEXT:
+        return 
StatisticsManager.getInstance().getMaxBinarySizeInBytes(inputSeriesPath);
+      default:
+        throw new UnsupportedOperationException("Unknown data type " + 
tsDataType);
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 8cf4aef8b0..7efe992cdd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import 
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -37,7 +36,6 @@ import java.util.concurrent.TimeUnit;
 
 import static com.google.common.util.concurrent.Futures.successfulAsList;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
-import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 
 /**
  * AggregationOperator can process the situation: aggregation of intermediate 
aggregate result, it
@@ -62,16 +60,20 @@ public class AggregationOperator implements ProcessOperator 
{
   // using for building result tsBlock
   private final TsBlockBuilder resultTsBlockBuilder;
 
+  private final long maxRetainedSize;
+  private final long childrenRetainedSize;
+  private final long maxReturnSize;
+
   public AggregationOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       List<Operator> children,
-      boolean ascending,
-      GroupByTimeParameter groupByTimeParameter,
-      boolean outputPartialTimeWindow) {
+      long maxReturnSize) {
     this.operatorContext = operatorContext;
     this.children = children;
     this.aggregators = aggregators;
+    this.timeRangeIterator = timeRangeIterator;
 
     this.inputOperatorsCount = children.size();
     this.inputTsBlocks = new TsBlock[inputOperatorsCount];
@@ -80,14 +82,16 @@ public class AggregationOperator implements ProcessOperator 
{
       canCallNext[i] = false;
     }
 
-    this.timeRangeIterator =
-        initTimeRangeIterator(groupByTimeParameter, ascending, 
outputPartialTimeWindow);
-
     List<TSDataType> dataTypes = new ArrayList<>();
     for (Aggregator aggregator : aggregators) {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
     this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+    this.maxRetainedSize = 
children.stream().mapToLong(Operator::calculateMaxReturnSize).sum();
+    this.childrenRetainedSize =
+        
children.stream().mapToLong(Operator::calculateRetainedSizeAfterCallingNext).sum();
+    this.maxReturnSize = maxReturnSize;
   }
 
   @Override
@@ -95,6 +99,21 @@ public class AggregationOperator implements ProcessOperator {
     return operatorContext;
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxReturnSize + maxRetainedSize + childrenRetainedSize;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return maxRetainedSize + childrenRetainedSize;
+  }
+
   @Override
   public ListenableFuture<?> isBlocked() {
     List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index 88fb5fe20a..d95968850d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -20,9 +20,9 @@
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import 
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -45,10 +45,11 @@ public class RawDataAggregationOperator extends 
SingleInputAggregationOperator {
   public RawDataAggregationOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       Operator child,
       boolean ascending,
-      GroupByTimeParameter groupByTimeParameter) {
-    super(operatorContext, aggregators, child, ascending, 
groupByTimeParameter, true);
+      long maxReturnSize) {
+    super(operatorContext, aggregators, child, ascending, timeRangeIterator, 
maxReturnSize);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index c59de13988..1d746f1150 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import 
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -37,7 +36,6 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
-import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 
 public abstract class SingleInputAggregationOperator implements 
ProcessOperator {
 
@@ -57,26 +55,30 @@ public abstract class SingleInputAggregationOperator 
implements ProcessOperator
   // using for building result tsBlock
   protected final TsBlockBuilder resultTsBlockBuilder;
 
+  protected final long maxRetainedSize;
+  protected final long maxReturnSize;
+
   public SingleInputAggregationOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
       Operator child,
       boolean ascending,
-      GroupByTimeParameter groupByTimeParameter,
-      boolean outputPartialTimeWindow) {
+      ITimeRangeIterator timeRangeIterator,
+      long maxReturnSize) {
     this.operatorContext = operatorContext;
     this.ascending = ascending;
     this.child = child;
     this.aggregators = aggregators;
-
-    this.timeRangeIterator =
-        initTimeRangeIterator(groupByTimeParameter, ascending, 
outputPartialTimeWindow);
+    this.timeRangeIterator = timeRangeIterator;
 
     List<TSDataType> dataTypes = new ArrayList<>();
     for (Aggregator aggregator : aggregators) {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
     this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+    this.maxRetainedSize = child.calculateMaxReturnSize();
+    this.maxReturnSize = maxReturnSize;
   }
 
   @Override
@@ -146,4 +148,19 @@ public abstract class SingleInputAggregationOperator 
implements ProcessOperator
     curTimeRange = null;
     appendAggregationResult(resultTsBlockBuilder, aggregators, 
timeRangeIterator);
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxReturnSize + maxRetainedSize + 
child.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 10303016c5..d73491d3e4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -40,10 +40,12 @@ public class SlidingWindowAggregationOperator extends 
SingleInputAggregationOper
   public SlidingWindowAggregationOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       Operator child,
       boolean ascending,
-      GroupByTimeParameter groupByTimeParameter) {
-    super(operatorContext, aggregators, child, ascending, 
groupByTimeParameter, false);
+      GroupByTimeParameter groupByTimeParameter,
+      long maxReturnSize) {
+    super(operatorContext, aggregators, child, ascending, timeRangeIterator, 
maxReturnSize);
     checkArgument(
         groupByTimeParameter != null,
         "GroupByTimeParameter cannot be null in 
SlidingWindowAggregationOperator");
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index 2d7d671be9..e7b3f37885 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
@@ -40,7 +41,6 @@ import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
-import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult;
 
 public abstract class AbstractSeriesAggregationScanOperator implements 
DataSourceOperator {
@@ -68,14 +68,19 @@ public abstract class AbstractSeriesAggregationScanOperator 
implements DataSourc
 
   protected boolean finished = false;
 
+  private final long maxRetainedSize;
+  private final long maxReturnSize;
+
   public AbstractSeriesAggregationScanOperator(
       PlanNodeId sourceId,
       OperatorContext context,
       SeriesScanUtil seriesScanUtil,
       int subSensorSize,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       boolean ascending,
-      GroupByTimeParameter groupByTimeParameter) {
+      GroupByTimeParameter groupByTimeParameter,
+      long maxReturnSize) {
     this.sourceId = sourceId;
     this.operatorContext = context;
     this.ascending = ascending;
@@ -83,14 +88,17 @@ public abstract class AbstractSeriesAggregationScanOperator 
implements DataSourc
     this.seriesScanUtil = seriesScanUtil;
     this.subSensorSize = subSensorSize;
     this.aggregators = aggregators;
-
-    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, 
ascending, true);
+    this.timeRangeIterator = timeRangeIterator;
 
     List<TSDataType> dataTypes = new ArrayList<>();
     for (Aggregator aggregator : aggregators) {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
     this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+    this.maxRetainedSize =
+        (1L + subSensorSize) * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    this.maxReturnSize = maxReturnSize;
   }
 
   @Override
@@ -108,6 +116,21 @@ public abstract class 
AbstractSeriesAggregationScanOperator implements DataSourc
     seriesScanUtil.initQueryDataSource(dataSource);
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxRetainedSize + maxReturnSize;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return maxRetainedSize;
+  }
+
   @Override
   public boolean hasNext() {
     return timeRangeIterator.hasNextTimeRange();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
index f9fa1f8d52..2ad3eb1a83 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import 
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -37,9 +38,11 @@ public class AlignedSeriesAggregationScanOperator extends 
AbstractSeriesAggregat
       AlignedPath seriesPath,
       OperatorContext context,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       Filter timeFilter,
       boolean ascending,
-      GroupByTimeParameter groupByTimeParameter) {
+      GroupByTimeParameter groupByTimeParameter,
+      long maxReturnSize) {
     super(
         sourceId,
         context,
@@ -52,7 +55,9 @@ public class AlignedSeriesAggregationScanOperator extends 
AbstractSeriesAggregat
             ascending),
         seriesPath.getMeasurementList().size(),
         aggregators,
+        timeRangeIterator,
         ascending,
-        groupByTimeParameter);
+        groupByTimeParameter,
+        maxReturnSize);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
index 157f51f2fa..99b24e06a5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import 
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -44,9 +45,11 @@ public class SeriesAggregationScanOperator extends 
AbstractSeriesAggregationScan
       Set<String> allSensors,
       OperatorContext context,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       Filter timeFilter,
       boolean ascending,
-      GroupByTimeParameter groupByTimeParameter) {
+      GroupByTimeParameter groupByTimeParameter,
+      long maxReturnSize) {
     super(
         sourceId,
         context,
@@ -60,7 +63,9 @@ public class SeriesAggregationScanOperator extends 
AbstractSeriesAggregationScan
             ascending),
         1,
         aggregators,
+        timeRangeIterator,
         ascending,
-        groupByTimeParameter);
+        groupByTimeParameter,
+        maxReturnSize);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 5c4570a1fe..0978a006f5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -27,12 +27,14 @@ import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import 
org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregatorFactory;
+import 
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
 import org.apache.iotdb.db.mpp.execution.exchange.ISourceHandle;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
+import org.apache.iotdb.db.mpp.execution.operator.AggregationUtil;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
@@ -150,6 +152,7 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OutputColumn;
 import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
@@ -185,6 +188,9 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSize;
+import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSizeForLastQuery;
+import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static 
org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
 
 /** This Visitor is responsible for transferring PlanNode Tree to Operator 
Tree */
@@ -272,6 +278,58 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     return seriesScanOperator;
   }
 
+  @Override
+  public Operator visitSeriesAggregationScan(
+      SeriesAggregationScanNode node, LocalExecutionPlanContext context) {
+    PartialPath seriesPath = node.getSeriesPath();
+    boolean ascending = node.getScanOrder() == Ordering.ASC;
+    OperatorContext operatorContext =
+        context
+            .getInstanceContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                SeriesAggregationScanOperator.class.getSimpleName());
+
+    List<AggregationDescriptor> aggregationDescriptors = 
node.getAggregationDescriptorList();
+    List<Aggregator> aggregators = new ArrayList<>();
+    aggregationDescriptors.forEach(
+        o ->
+            aggregators.add(
+                new Aggregator(
+                    AccumulatorFactory.createAccumulator(
+                        o.getAggregationType(), 
node.getSeriesPath().getSeriesType(), ascending),
+                    o.getStep())));
+
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+    ITimeRangeIterator timeRangeIterator =
+        initTimeRangeIterator(groupByTimeParameter, ascending, true);
+    long maxReturnSize =
+        AggregationUtil.calculateMaxAggregationResultSize(
+            node.getAggregationDescriptorList(),
+            timeRangeIterator,
+            groupByTimeParameter != null,
+            context.getTypeProvider());
+
+    SeriesAggregationScanOperator aggregateScanOperator =
+        new SeriesAggregationScanOperator(
+            node.getPlanNodeId(),
+            seriesPath,
+            context.getAllSensors(seriesPath.getDevice(), 
seriesPath.getMeasurement()),
+            operatorContext,
+            aggregators,
+            timeRangeIterator,
+            node.getTimeFilter(),
+            ascending,
+            node.getGroupByTimeParameter(),
+            maxReturnSize);
+
+    context.addSourceOperator(aggregateScanOperator);
+    context.addPath(seriesPath);
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
+    return aggregateScanOperator;
+  }
+
   @Override
   public Operator visitAlignedSeriesAggregationScan(
       AlignedSeriesAggregationScanNode node, LocalExecutionPlanContext 
context) {
@@ -307,15 +365,27 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
               Collections.singletonList(new InputLocation[] {new 
InputLocation(0, seriesIndex)})));
     }
 
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+    ITimeRangeIterator timeRangeIterator =
+        initTimeRangeIterator(groupByTimeParameter, ascending, true);
+    long maxReturnSize =
+        AggregationUtil.calculateMaxAggregationResultSize(
+            node.getAggregationDescriptorList(),
+            timeRangeIterator,
+            groupByTimeParameter != null,
+            context.getTypeProvider());
+
     AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
         new AlignedSeriesAggregationScanOperator(
             node.getPlanNodeId(),
             seriesPath,
             operatorContext,
             aggregators,
+            timeRangeIterator,
             node.getTimeFilter(),
             ascending,
-            node.getGroupByTimeParameter());
+            groupByTimeParameter,
+            maxReturnSize);
 
     context.addSourceOperator(seriesAggregationScanOperator);
     context.addPath(seriesPath);
@@ -555,47 +625,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     return new NodePathsCountOperator(operatorContext, child);
   }
 
-  @Override
-  public Operator visitSeriesAggregationScan(
-      SeriesAggregationScanNode node, LocalExecutionPlanContext context) {
-    PartialPath seriesPath = node.getSeriesPath();
-    boolean ascending = node.getScanOrder() == Ordering.ASC;
-    OperatorContext operatorContext =
-        context
-            .getInstanceContext()
-            .addOperatorContext(
-                context.getNextOperatorId(),
-                node.getPlanNodeId(),
-                SeriesAggregationScanOperator.class.getSimpleName());
-
-    List<Aggregator> aggregators = new ArrayList<>();
-    node.getAggregationDescriptorList()
-        .forEach(
-            o ->
-                aggregators.add(
-                    new Aggregator(
-                        AccumulatorFactory.createAccumulator(
-                            o.getAggregationType(),
-                            node.getSeriesPath().getSeriesType(),
-                            ascending),
-                        o.getStep())));
-    SeriesAggregationScanOperator aggregateScanOperator =
-        new SeriesAggregationScanOperator(
-            node.getPlanNodeId(),
-            seriesPath,
-            context.getAllSensors(seriesPath.getDevice(), 
seriesPath.getMeasurement()),
-            operatorContext,
-            aggregators,
-            node.getTimeFilter(),
-            ascending,
-            node.getGroupByTimeParameter());
-
-    context.addSourceOperator(aggregateScanOperator);
-    context.addPath(seriesPath);
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
-    return aggregateScanOperator;
-  }
-
   @Override
   public Operator visitDeviceView(DeviceViewNode node, 
LocalExecutionPlanContext context) {
     OperatorContext operatorContext =
@@ -1016,7 +1045,8 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     boolean ascending = node.getScanOrder() == Ordering.ASC;
     List<Aggregator> aggregators = new ArrayList<>();
     Map<String, List<InputLocation>> layout = makeLayout(node);
-    for (GroupByLevelDescriptor descriptor : 
node.getGroupByLevelDescriptors()) {
+    List<GroupByLevelDescriptor> aggregationDescriptors = 
node.getGroupByLevelDescriptors();
+    for (GroupByLevelDescriptor descriptor : aggregationDescriptors) {
       List<InputLocation[]> inputLocationList = 
calcInputLocationList(descriptor, layout);
       TSDataType seriesDataType =
           context
@@ -1038,9 +1068,19 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 node.getPlanNodeId(),
                 AggregationOperator.class.getSimpleName());
 
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+    ITimeRangeIterator timeRangeIterator =
+        initTimeRangeIterator(groupByTimeParameter, ascending, false);
+    long maxReturnSize =
+        calculateMaxAggregationResultSize(
+            aggregationDescriptors,
+            timeRangeIterator,
+            groupByTimeParameter != null,
+            context.getTypeProvider());
+
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
     return new AggregationOperator(
-        operatorContext, aggregators, children, ascending, 
node.getGroupByTimeParameter(), false);
+        operatorContext, aggregators, timeRangeIterator, children, 
maxReturnSize);
   }
 
   @Override
@@ -1060,7 +1100,8 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     boolean ascending = node.getScanOrder() == Ordering.ASC;
     List<Aggregator> aggregators = new ArrayList<>();
     Map<String, List<InputLocation>> layout = makeLayout(node);
-    for (AggregationDescriptor descriptor : 
node.getAggregationDescriptorList()) {
+    List<AggregationDescriptor> aggregationDescriptors = 
node.getAggregationDescriptorList();
+    for (AggregationDescriptor descriptor : aggregationDescriptors) {
       List<InputLocation[]> inputLocationList = 
calcInputLocationList(descriptor, layout);
       aggregators.add(
           SlidingWindowAggregatorFactory.createSlidingWindowAggregator(
@@ -1074,9 +1115,25 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
               descriptor.getStep()));
     }
 
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+    ITimeRangeIterator timeRangeIterator =
+        initTimeRangeIterator(groupByTimeParameter, ascending, false);
+    long maxReturnSize =
+        calculateMaxAggregationResultSize(
+            aggregationDescriptors,
+            timeRangeIterator,
+            groupByTimeParameter != null,
+            context.getTypeProvider());
+
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
     return new SlidingWindowAggregationOperator(
-        operatorContext, aggregators, child, ascending, 
node.getGroupByTimeParameter());
+        operatorContext,
+        aggregators,
+        timeRangeIterator,
+        child,
+        ascending,
+        groupByTimeParameter,
+        maxReturnSize);
   }
 
   @Override
@@ -1121,6 +1178,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     boolean ascending = node.getScanOrder() == Ordering.ASC;
     List<Aggregator> aggregators = new ArrayList<>();
     Map<String, List<InputLocation>> layout = makeLayout(node);
+    List<AggregationDescriptor> aggregationDescriptors = 
node.getAggregationDescriptorList();
     for (AggregationDescriptor descriptor : 
node.getAggregationDescriptorList()) {
       List<InputLocation[]> inputLocationList = 
calcInputLocationList(descriptor, layout);
       aggregators.add(
@@ -1136,6 +1194,8 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
               inputLocationList));
     }
     boolean inputRaw = 
node.getAggregationDescriptorList().get(0).getStep().isInputRaw();
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+
     if (inputRaw) {
       checkArgument(children.size() == 1, "rawDataAggregateOperator can only 
accept one input");
       OperatorContext operatorContext =
@@ -1146,8 +1206,23 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                   node.getPlanNodeId(),
                   RawDataAggregationOperator.class.getSimpleName());
       context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
+
+      ITimeRangeIterator timeRangeIterator =
+          initTimeRangeIterator(groupByTimeParameter, ascending, true);
+      long maxReturnSize =
+          calculateMaxAggregationResultSize(
+              aggregationDescriptors,
+              timeRangeIterator,
+              groupByTimeParameter != null,
+              context.getTypeProvider());
+
       return new RawDataAggregationOperator(
-          operatorContext, aggregators, children.get(0), ascending, 
node.getGroupByTimeParameter());
+          operatorContext,
+          aggregators,
+          timeRangeIterator,
+          children.get(0),
+          ascending,
+          maxReturnSize);
     } else {
       OperatorContext operatorContext =
           context
@@ -1156,9 +1231,19 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                   context.getNextOperatorId(),
                   node.getPlanNodeId(),
                   AggregationOperator.class.getSimpleName());
+
+      ITimeRangeIterator timeRangeIterator =
+          initTimeRangeIterator(groupByTimeParameter, ascending, true);
+      long maxReturnSize =
+          calculateMaxAggregationResultSize(
+              aggregationDescriptors,
+              timeRangeIterator,
+              groupByTimeParameter != null,
+              context.getTypeProvider());
+
       context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
       return new AggregationOperator(
-          operatorContext, aggregators, children, ascending, 
node.getGroupByTimeParameter(), true);
+          operatorContext, aggregators, timeRangeIterator, children, 
maxReturnSize);
     }
   }
 
@@ -1400,6 +1485,10 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
 
     // last_time, last_value
     List<Aggregator> aggregators = 
LastQueryUtil.createAggregators(seriesPath.getSeriesType());
+    ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(null, false, 
false);
+    long maxReturnSize =
+        calculateMaxAggregationResultSizeForLastQuery(
+            aggregators, seriesPath.transformToPartialPath());
 
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         new SeriesAggregationScanOperator(
@@ -1408,9 +1497,11 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
             context.getAllSensors(seriesPath.getDevice(), 
seriesPath.getMeasurement()),
             operatorContext,
             aggregators,
+            timeRangeIterator,
             context.getLastQueryTimeFilter(),
             false,
-            null);
+            null,
+            maxReturnSize);
     context.addSourceOperator(seriesAggregationScanOperator);
     context.addPath(seriesPath);
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
@@ -1479,15 +1570,22 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     // last_time, last_value
     List<Aggregator> aggregators =
         
LastQueryUtil.createAggregators(seriesPath.getSchemaList().get(0).getType());
+    ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(null, false, 
false);
+    long maxReturnSize =
+        calculateMaxAggregationResultSizeForLastQuery(
+            aggregators, seriesPath.transformToPartialPath());
+
     AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
         new AlignedSeriesAggregationScanOperator(
             node.getPlanNodeId(),
             seriesPath,
             operatorContext,
             aggregators,
+            timeRangeIterator,
             context.getLastQueryTimeFilter(),
             false,
-            null);
+            null,
+            maxReturnSize);
     context.addSourceOperator(seriesAggregationScanOperator);
     context.addPath(seriesPath);
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
new file mode 100644
index 0000000000..44d5fc1c66
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.statistics;
+
+import org.apache.iotdb.commons.path.PartialPath;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+public class StatisticsManager {
+
+  private final Map<PartialPath, TimeseriesStats> seriesToStatsMap = 
Maps.newConcurrentMap();
+
+  public long getMaxBinarySizeInBytes(PartialPath path) {
+    return 512 * Byte.BYTES;
+  }
+
+  public static StatisticsManager getInstance() {
+    return StatisticsManager.StatisticsManagerHelper.INSTANCE;
+  }
+
+  private static class StatisticsManagerHelper {
+
+    private static final StatisticsManager INSTANCE = new StatisticsManager();
+
+    private StatisticsManagerHelper() {}
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
new file mode 100644
index 0000000000..509341d3d5
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.statistics;
+
+public class TimeseriesStats {
+  // TODO collect time series statistics
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java
 
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java
index 6233fa04ce..e25f592aa8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java
@@ -166,6 +166,11 @@ public class TimeSelector {
     return smallerChildIndex;
   }
 
+  public void clear() {
+    heapSize = 0;
+    lastTime = Long.MIN_VALUE;
+  }
+
   @Override
   public String toString() {
     return Arrays.toString(this.timeHeap);
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
index 88e4b6bf16..1d3f3ccf79 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
@@ -295,6 +295,8 @@ public class TimeRangeIteratorTest {
   }
 
   private void checkRes(ITimeRangeIterator timeRangeIterator, String[] res) {
+    Assert.assertEquals(res.length, timeRangeIterator.getTotalIntervalNum());
+
     boolean isAscending = timeRangeIterator.isAscending();
     int cnt = isAscending ? 0 : res.length - 1;
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
index 233965aa3e..85e65849a6 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
@@ -61,6 +61,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 
 public class AggregationOperatorTest {
@@ -319,9 +321,11 @@ public class AggregationOperatorTest {
             Collections.singleton("sensor0"),
             fragmentInstanceContext.getOperatorContexts().get(0),
             aggregators,
+            initTimeRangeIterator(groupByTimeParameter, true, true),
             null,
             true,
-            groupByTimeParameter);
+            groupByTimeParameter,
+            DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
     List<TsFileResource> seqResources1 = new ArrayList<>();
     List<TsFileResource> unSeqResources1 = new ArrayList<>();
     seqResources1.add(seqResources.get(0));
@@ -341,9 +345,11 @@ public class AggregationOperatorTest {
             Collections.singleton("sensor0"),
             fragmentInstanceContext.getOperatorContexts().get(1),
             aggregators,
+            initTimeRangeIterator(groupByTimeParameter, true, true),
             null,
             true,
-            groupByTimeParameter);
+            groupByTimeParameter,
+            DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
     List<TsFileResource> seqResources2 = new ArrayList<>();
     List<TsFileResource> unSeqResources2 = new ArrayList<>();
     seqResources2.add(seqResources.get(2));
@@ -368,9 +374,8 @@ public class AggregationOperatorTest {
     return new AggregationOperator(
         fragmentInstanceContext.getOperatorContexts().get(2),
         finalAggregators,
+        initTimeRangeIterator(groupByTimeParameter, true, true),
         children,
-        true,
-        groupByTimeParameter,
-        true);
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
   }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
index 2277299102..a2e989f47e 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
@@ -63,6 +63,8 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -621,10 +623,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
         1, planNodeId, SeriesScanOperator.class.getSimpleName());
     fragmentInstanceContext
         .getOperatorContexts()
-        .forEach(
-            operatorContext -> {
-              operatorContext.setMaxRunTime(TEST_TIME_SLICE);
-            });
+        .forEach(operatorContext -> 
operatorContext.setMaxRunTime(TEST_TIME_SLICE));
 
     AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
         new AlignedSeriesAggregationScanOperator(
@@ -632,9 +631,11 @@ public class AlignedSeriesAggregationScanOperatorTest {
             alignedPath,
             fragmentInstanceContext.getOperatorContexts().get(0),
             aggregators,
+            initTimeRangeIterator(groupByTimeParameter, ascending, true),
             timeFilter,
             ascending,
-            groupByTimeParameter);
+            groupByTimeParameter,
+            DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
     seriesAggregationScanOperator.initQueryDataSource(
         new QueryDataSource(seqResources, unSeqResources));
     return seriesAggregationScanOperator;
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
index d914ec3ee7..4864fed39b 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
@@ -56,6 +56,8 @@ import java.util.concurrent.ExecutorService;
 
 import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -133,9 +135,11 @@ public class LastQueryOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(0),
               aggregators1,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
-              null);
+              null,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
       seriesAggregationScanOperator1.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
@@ -155,9 +159,11 @@ public class LastQueryOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(2),
               aggregators2,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
-              null);
+              null,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
       seriesAggregationScanOperator2.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
@@ -252,9 +258,11 @@ public class LastQueryOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(0),
               aggregators1,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
-              null);
+              null,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
       seriesAggregationScanOperator1.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
@@ -274,9 +282,11 @@ public class LastQueryOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(2),
               aggregators2,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
-              null);
+              null,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
       seriesAggregationScanOperator2.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
index 2785f1d803..6dc84c071b 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
@@ -58,6 +58,8 @@ import java.util.concurrent.ExecutorService;
 
 import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -135,9 +137,11 @@ public class LastQuerySortOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(0),
               aggregators1,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
-              null);
+              null,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
       seriesAggregationScanOperator1.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
@@ -157,9 +161,11 @@ public class LastQuerySortOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(2),
               aggregators2,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
-              null);
+              null,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
       seriesAggregationScanOperator2.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
@@ -255,9 +261,11 @@ public class LastQuerySortOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(0),
               aggregators1,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
-              null);
+              null,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
       seriesAggregationScanOperator1.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
@@ -277,9 +285,11 @@ public class LastQuerySortOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(2),
               aggregators2,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
-              null);
+              null,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
       seriesAggregationScanOperator2.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index d0598e6494..085f9829c6 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -22,11 +22,15 @@ import 
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import 
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
@@ -34,6 +38,8 @@ import 
org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperat
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.SortOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
@@ -63,20 +69,29 @@ import 
org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesSchemaScanOpe
 import 
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
 import 
org.apache.iotdb.db.mpp.transformation.dag.column.binary.ArithmeticAdditionColumnTransformer;
 import 
org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareLessEqualColumnTransformer;
 import 
org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer;
 import 
org.apache.iotdb.db.mpp.transformation.dag.column.leaf.TimeColumnTransformer;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 import org.apache.iotdb.tsfile.read.common.type.BooleanType;
 import org.apache.iotdb.tsfile.read.common.type.LongType;
 import org.apache.iotdb.tsfile.read.common.type.TypeEnum;
@@ -95,6 +110,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static 
org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator.MAP_NODE_RETRAINED_SIZE;
 import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
@@ -1073,4 +1089,448 @@ public class OperatorMemoryTest {
     assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
     assertEquals(expectedRetainedSize, 
operator.calculateRetainedSizeAfterCallingNext());
   }
+
+  @Test
+  public void seriesAggregationScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1", 
TSDataType.TEXT);
+      TypeProvider typeProvider = new TypeProvider();
+      typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+      typeProvider.setType("min_time(root.sg.d1.s1)", TSDataType.INT64);
+      typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+      // case1: without group by, step is SINGLE
+      List<AggregationDescriptor> aggregationDescriptors1 =
+          Arrays.asList(
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))),
+              new AggregationDescriptor(
+                  AggregationType.COUNT.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))));
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator1 =
+          createSeriesAggregationScanOperator(
+              instanceNotificationExecutor,
+              measurementPath,
+              aggregationDescriptors1,
+              null,
+              typeProvider);
+
+      long expectedMaxReturnSize = 512 * Byte.BYTES + 
LongColumn.SIZE_IN_BYTES_PER_POSITION;
+      long expectedMaxRetainSize =
+          2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+      assertEquals(
+          expectedMaxReturnSize + expectedMaxRetainSize,
+          seriesAggregationScanOperator1.calculateMaxPeekMemory());
+      assertEquals(expectedMaxReturnSize, 
seriesAggregationScanOperator1.calculateMaxReturnSize());
+      assertEquals(
+          expectedMaxRetainSize,
+          
seriesAggregationScanOperator1.calculateRetainedSizeAfterCallingNext());
+
+      // case2: without group by, step is PARTIAL
+      List<AggregationDescriptor> aggregationDescriptors2 =
+          Arrays.asList(
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.PARTIAL,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))),
+              new AggregationDescriptor(
+                  AggregationType.COUNT.name().toLowerCase(),
+                  AggregationStep.PARTIAL,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))));
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator2 =
+          createSeriesAggregationScanOperator(
+              instanceNotificationExecutor,
+              measurementPath,
+              aggregationDescriptors2,
+              null,
+              typeProvider);
+
+      expectedMaxReturnSize = 512 * Byte.BYTES + 2 * 
LongColumn.SIZE_IN_BYTES_PER_POSITION;
+      expectedMaxRetainSize = 2L * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+      assertEquals(
+          expectedMaxReturnSize + expectedMaxRetainSize,
+          seriesAggregationScanOperator2.calculateMaxPeekMemory());
+      assertEquals(expectedMaxReturnSize, 
seriesAggregationScanOperator2.calculateMaxReturnSize());
+      assertEquals(
+          expectedMaxRetainSize,
+          
seriesAggregationScanOperator2.calculateRetainedSizeAfterCallingNext());
+
+      long maxTsBlockLineNumber =
+          TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+
+      // case3: with group by, total window num < 1000
+      GroupByTimeParameter groupByTimeParameter =
+          new GroupByTimeParameter(
+              0,
+              2 * maxTsBlockLineNumber,
+              maxTsBlockLineNumber / 100,
+              maxTsBlockLineNumber / 100,
+              true);
+      List<AggregationDescriptor> aggregationDescriptors3 =
+          Arrays.asList(
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))),
+              new AggregationDescriptor(
+                  AggregationType.COUNT.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))));
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator3 =
+          createSeriesAggregationScanOperator(
+              instanceNotificationExecutor,
+              measurementPath,
+              aggregationDescriptors3,
+              groupByTimeParameter,
+              typeProvider);
+
+      expectedMaxReturnSize =
+          200
+              * (TimeColumn.SIZE_IN_BYTES_PER_POSITION
+                  + 512 * Byte.BYTES
+                  + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+      expectedMaxRetainSize = 2L * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+      assertEquals(
+          expectedMaxReturnSize + expectedMaxRetainSize,
+          seriesAggregationScanOperator3.calculateMaxPeekMemory());
+      assertEquals(expectedMaxReturnSize, 
seriesAggregationScanOperator3.calculateMaxReturnSize());
+      assertEquals(
+          expectedMaxRetainSize,
+          
seriesAggregationScanOperator3.calculateRetainedSizeAfterCallingNext());
+
+      // case4: with group by, total window num > 1000
+      groupByTimeParameter = new GroupByTimeParameter(0, 2 * 
maxTsBlockLineNumber, 1, 1, true);
+      List<AggregationDescriptor> aggregationDescriptors4 =
+          Arrays.asList(
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))),
+              new AggregationDescriptor(
+                  AggregationType.COUNT.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))));
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator4 =
+          createSeriesAggregationScanOperator(
+              instanceNotificationExecutor,
+              measurementPath,
+              aggregationDescriptors4,
+              groupByTimeParameter,
+              typeProvider);
+
+      expectedMaxReturnSize =
+          maxTsBlockLineNumber
+              * (TimeColumn.SIZE_IN_BYTES_PER_POSITION
+                  + 512 * Byte.BYTES
+                  + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+      expectedMaxRetainSize = 2L * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+      assertEquals(
+          expectedMaxReturnSize + expectedMaxRetainSize,
+          seriesAggregationScanOperator4.calculateMaxPeekMemory());
+      assertEquals(expectedMaxReturnSize, 
seriesAggregationScanOperator4.calculateMaxReturnSize());
+      assertEquals(
+          expectedMaxRetainSize,
+          
seriesAggregationScanOperator4.calculateRetainedSizeAfterCallingNext());
+
+      // case5: over DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES
+      groupByTimeParameter = new GroupByTimeParameter(0, 2 * 
maxTsBlockLineNumber, 1, 1, true);
+      List<AggregationDescriptor> aggregationDescriptors5 =
+          Arrays.asList(
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))),
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))),
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))));
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator5 =
+          createSeriesAggregationScanOperator(
+              instanceNotificationExecutor,
+              measurementPath,
+              aggregationDescriptors5,
+              groupByTimeParameter,
+              typeProvider);
+
+      expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+      expectedMaxRetainSize = 2L * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+      assertEquals(
+          expectedMaxReturnSize + expectedMaxRetainSize,
+          seriesAggregationScanOperator5.calculateMaxPeekMemory());
+      assertEquals(expectedMaxReturnSize, 
seriesAggregationScanOperator5.calculateMaxReturnSize());
+      assertEquals(
+          expectedMaxRetainSize,
+          
seriesAggregationScanOperator5.calculateRetainedSizeAfterCallingNext());
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  private SeriesAggregationScanOperator createSeriesAggregationScanOperator(
+      ExecutorService instanceNotificationExecutor,
+      MeasurementPath measurementPath,
+      List<AggregationDescriptor> aggregationDescriptors,
+      GroupByTimeParameter groupByTimeParameter,
+      TypeProvider typeProvider)
+      throws IllegalPathException {
+    Set<String> allSensors = Sets.newHashSet("s1");
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    PlanNodeId planNodeId = new PlanNodeId("1");
+    fragmentInstanceContext.addOperatorContext(
+        1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+    List<Aggregator> aggregators = new ArrayList<>();
+    aggregationDescriptors.forEach(
+        o ->
+            aggregators.add(
+                new Aggregator(
+                    AccumulatorFactory.createAccumulator(
+                        o.getAggregationType(), 
measurementPath.getSeriesType(), true),
+                    o.getStep())));
+
+    ITimeRangeIterator timeRangeIterator = 
initTimeRangeIterator(groupByTimeParameter, true, true);
+    long maxReturnSize =
+        AggregationUtil.calculateMaxAggregationResultSize(
+            aggregationDescriptors, timeRangeIterator, groupByTimeParameter != 
null, typeProvider);
+
+    return new SeriesAggregationScanOperator(
+        planNodeId,
+        measurementPath,
+        allSensors,
+        fragmentInstanceContext.getOperatorContexts().get(0),
+        aggregators,
+        timeRangeIterator,
+        null,
+        true,
+        groupByTimeParameter,
+        maxReturnSize);
+  }
+
+  @Test
+  public void rawDataAggregationOperatorTest() throws IllegalPathException {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+    MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1", 
TSDataType.TEXT);
+    TypeProvider typeProvider = new TypeProvider();
+    typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+    typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+    List<AggregationDescriptor> aggregationDescriptors =
+        Arrays.asList(
+            new AggregationDescriptor(
+                AggregationType.FIRST_VALUE.name().toLowerCase(),
+                AggregationStep.FINAL,
+                Collections.singletonList(new 
TimeSeriesOperand(measurementPath))),
+            new AggregationDescriptor(
+                AggregationType.COUNT.name().toLowerCase(),
+                AggregationStep.FINAL,
+                Collections.singletonList(new 
TimeSeriesOperand(measurementPath))));
+
+    List<Aggregator> aggregators = new ArrayList<>();
+    aggregationDescriptors.forEach(
+        o ->
+            aggregators.add(
+                new Aggregator(
+                    AccumulatorFactory.createAccumulator(
+                        o.getAggregationType(), 
measurementPath.getSeriesType(), true),
+                    o.getStep())));
+
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 
1000, 10, 10, true);
+    ITimeRangeIterator timeRangeIterator = 
initTimeRangeIterator(groupByTimeParameter, true, false);
+    long maxReturnSize =
+        AggregationUtil.calculateMaxAggregationResultSize(
+            aggregationDescriptors, timeRangeIterator, true, typeProvider);
+
+    RawDataAggregationOperator rawDataAggregationOperator =
+        new RawDataAggregationOperator(
+            Mockito.mock(OperatorContext.class),
+            aggregators,
+            timeRangeIterator,
+            child,
+            true,
+            maxReturnSize);
+
+    long expectedMaxReturnSize =
+        100
+            * (512 * Byte.BYTES
+                + TimeColumn.SIZE_IN_BYTES_PER_POSITION
+                + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+    long expectedMaxRetainSize = child.calculateMaxReturnSize();
+
+    assertEquals(
+        expectedMaxReturnSize
+            + expectedMaxRetainSize
+            + child.calculateRetainedSizeAfterCallingNext(),
+        rawDataAggregationOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, 
rawDataAggregationOperator.calculateMaxReturnSize());
+    assertEquals(
+        expectedMaxRetainSize + child.calculateRetainedSizeAfterCallingNext(),
+        rawDataAggregationOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void slidingWindowAggregationOperatorTest() throws 
IllegalPathException {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+    MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1", 
TSDataType.TEXT);
+    TypeProvider typeProvider = new TypeProvider();
+    typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+    typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+    List<AggregationDescriptor> aggregationDescriptors =
+        Arrays.asList(
+            new AggregationDescriptor(
+                AggregationType.FIRST_VALUE.name().toLowerCase(),
+                AggregationStep.FINAL,
+                Collections.singletonList(new 
TimeSeriesOperand(measurementPath))),
+            new AggregationDescriptor(
+                AggregationType.COUNT.name().toLowerCase(),
+                AggregationStep.FINAL,
+                Collections.singletonList(new 
TimeSeriesOperand(measurementPath))));
+
+    List<Aggregator> aggregators = new ArrayList<>();
+    aggregationDescriptors.forEach(
+        o ->
+            aggregators.add(
+                new Aggregator(
+                    AccumulatorFactory.createAccumulator(
+                        o.getAggregationType(), 
measurementPath.getSeriesType(), true),
+                    o.getStep())));
+
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 
1000, 10, 5, true);
+    ITimeRangeIterator timeRangeIterator = 
initTimeRangeIterator(groupByTimeParameter, true, false);
+    long maxReturnSize =
+        AggregationUtil.calculateMaxAggregationResultSize(
+            aggregationDescriptors, timeRangeIterator, true, typeProvider);
+
+    SlidingWindowAggregationOperator slidingWindowAggregationOperator =
+        new SlidingWindowAggregationOperator(
+            Mockito.mock(OperatorContext.class),
+            aggregators,
+            timeRangeIterator,
+            child,
+            true,
+            groupByTimeParameter,
+            maxReturnSize);
+
+    long expectedMaxReturnSize =
+        200
+            * (512 * Byte.BYTES
+                + TimeColumn.SIZE_IN_BYTES_PER_POSITION
+                + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+    long expectedMaxRetainSize = child.calculateMaxReturnSize();
+
+    assertEquals(
+        expectedMaxReturnSize
+            + expectedMaxRetainSize
+            + child.calculateRetainedSizeAfterCallingNext(),
+        slidingWindowAggregationOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, 
slidingWindowAggregationOperator.calculateMaxReturnSize());
+    assertEquals(
+        expectedMaxRetainSize + child.calculateRetainedSizeAfterCallingNext(),
+        
slidingWindowAggregationOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void aggregationOperatorTest() throws IllegalPathException {
+    List<Operator> children = new ArrayList<>(4);
+    long expectedChildrenRetainedSize = 0L;
+    long expectedMaxRetainSize = 0L;
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(64 * 
1024L);
+      expectedChildrenRetainedSize += 64 * 1024L;
+      expectedMaxRetainSize += 64 * 1024L;
+      children.add(child);
+    }
+
+    MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1", 
TSDataType.TEXT);
+    TypeProvider typeProvider = new TypeProvider();
+    typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+    typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+    List<AggregationDescriptor> aggregationDescriptors =
+        Arrays.asList(
+            new AggregationDescriptor(
+                AggregationType.FIRST_VALUE.name().toLowerCase(),
+                AggregationStep.FINAL,
+                Collections.singletonList(new 
TimeSeriesOperand(measurementPath))),
+            new AggregationDescriptor(
+                AggregationType.COUNT.name().toLowerCase(),
+                AggregationStep.FINAL,
+                Collections.singletonList(new 
TimeSeriesOperand(measurementPath))));
+
+    List<Aggregator> aggregators = new ArrayList<>();
+    aggregationDescriptors.forEach(
+        o ->
+            aggregators.add(
+                new Aggregator(
+                    AccumulatorFactory.createAccumulator(
+                        o.getAggregationType(), 
measurementPath.getSeriesType(), true),
+                    o.getStep())));
+
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 
1000, 10, 10, true);
+    ITimeRangeIterator timeRangeIterator = 
initTimeRangeIterator(groupByTimeParameter, true, false);
+    long maxReturnSize =
+        AggregationUtil.calculateMaxAggregationResultSize(
+            aggregationDescriptors, timeRangeIterator, true, typeProvider);
+
+    AggregationOperator aggregationOperator =
+        new AggregationOperator(
+            Mockito.mock(OperatorContext.class),
+            aggregators,
+            timeRangeIterator,
+            children,
+            maxReturnSize);
+
+    long expectedMaxReturnSize =
+        100
+            * (512 * Byte.BYTES
+                + TimeColumn.SIZE_IN_BYTES_PER_POSITION
+                + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+
+    assertEquals(
+        expectedMaxReturnSize + expectedMaxRetainSize + 
expectedChildrenRetainedSize,
+        aggregationOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, 
aggregationOperator.calculateMaxReturnSize());
+    assertEquals(
+        expectedMaxRetainSize + expectedChildrenRetainedSize,
+        aggregationOperator.calculateRetainedSizeAfterCallingNext());
+  }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
index 6a9902f24b..f25b0d2fcd 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
@@ -64,6 +64,8 @@ import java.util.concurrent.ExecutorService;
 
 import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 
 public class RawDataAggregationOperatorTest {
@@ -381,8 +383,9 @@ public class RawDataAggregationOperatorTest {
     return new RawDataAggregationOperator(
         fragmentInstanceContext.getOperatorContexts().get(3),
         aggregators,
+        initTimeRangeIterator(groupByTimeParameter, true, true),
         timeJoinOperator,
         true,
-        groupByTimeParameter);
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
   }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
index 71fd167404..f3b37b83f7 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
@@ -59,6 +59,8 @@ import java.util.concurrent.ExecutorService;
 
 import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 
 public class SeriesAggregationScanOperatorTest {
@@ -515,9 +517,11 @@ public class SeriesAggregationScanOperatorTest {
             allSensors,
             fragmentInstanceContext.getOperatorContexts().get(0),
             aggregators,
+            initTimeRangeIterator(groupByTimeParameter, ascending, true),
             timeFilter,
             ascending,
-            groupByTimeParameter);
+            groupByTimeParameter,
+            DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
     seriesAggregationScanOperator.initQueryDataSource(
         new QueryDataSource(seqResources, unSeqResources));
     return seriesAggregationScanOperator;
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
index 81ab99e1ff..a0ed242e63 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
@@ -61,6 +61,8 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class SlidingWindowAggregationOperatorTest {
 
@@ -232,9 +234,11 @@ public class SlidingWindowAggregationOperatorTest {
             Collections.singleton("sensor0"),
             fragmentInstanceContext.getOperatorContexts().get(0),
             aggregators,
+            initTimeRangeIterator(groupByTimeParameter, ascending, true),
             null,
             ascending,
-            groupByTimeParameter);
+            groupByTimeParameter,
+            DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
     seriesAggregationScanOperator.initQueryDataSource(
         new QueryDataSource(seqResources, unSeqResources));
 
@@ -254,8 +258,10 @@ public class SlidingWindowAggregationOperatorTest {
     return new SlidingWindowAggregationOperator(
         fragmentInstanceContext.getOperatorContexts().get(1),
         finalAggregators,
+        initTimeRangeIterator(groupByTimeParameter, ascending, false),
         seriesAggregationScanOperator,
         ascending,
-        groupByTimeParameter);
+        groupByTimeParameter,
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
   }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
index 34b9b8fa87..bff57278b1 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
@@ -56,6 +56,8 @@ import java.util.concurrent.ExecutorService;
 
 import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -211,9 +213,11 @@ public class UpdateLastCacheOperatorTest {
             allSensors,
             fragmentInstanceContext.getOperatorContexts().get(0),
             aggregators,
+            initTimeRangeIterator(groupByTimeParameter, ascending, true),
             timeFilter,
             ascending,
-            groupByTimeParameter);
+            groupByTimeParameter,
+            DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
     seriesAggregationScanOperator.initQueryDataSource(
         new QueryDataSource(seqResources, unSeqResources));
 

Reply via email to