This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/lmh/AggOpMemoryControl by this 
push:
     new 632b390c2f add UTs
632b390c2f is described below

commit 632b390c2f5670cd608ca82717ae2b8e359c0ad8
Author: Minghui Liu <[email protected]>
AuthorDate: Mon Aug 15 10:51:43 2022 +0800

    add UTs
---
 .../mpp/execution/operator/OperatorMemoryTest.java | 460 +++++++++++++++++++++
 1 file changed, 460 insertions(+)

diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index d0598e6494..085f9829c6 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -22,11 +22,15 @@ import 
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import 
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
@@ -34,6 +38,8 @@ import 
org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperat
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.SortOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
@@ -63,20 +69,29 @@ import 
org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesSchemaScanOpe
 import 
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
 import 
org.apache.iotdb.db.mpp.transformation.dag.column.binary.ArithmeticAdditionColumnTransformer;
 import 
org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareLessEqualColumnTransformer;
 import 
org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer;
 import 
org.apache.iotdb.db.mpp.transformation.dag.column.leaf.TimeColumnTransformer;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 import org.apache.iotdb.tsfile.read.common.type.BooleanType;
 import org.apache.iotdb.tsfile.read.common.type.LongType;
 import org.apache.iotdb.tsfile.read.common.type.TypeEnum;
@@ -95,6 +110,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static 
org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator.MAP_NODE_RETRAINED_SIZE;
 import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
@@ -1073,4 +1089,448 @@ public class OperatorMemoryTest {
     assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
     assertEquals(expectedRetainedSize, 
operator.calculateRetainedSizeAfterCallingNext());
   }
+
+  @Test
+  public void seriesAggregationScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1", 
TSDataType.TEXT);
+      TypeProvider typeProvider = new TypeProvider();
+      typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+      typeProvider.setType("min_time(root.sg.d1.s1)", TSDataType.INT64);
+      typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+      // case1: without group by, step is SINGLE
+      List<AggregationDescriptor> aggregationDescriptors1 =
+          Arrays.asList(
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))),
+              new AggregationDescriptor(
+                  AggregationType.COUNT.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))));
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator1 =
+          createSeriesAggregationScanOperator(
+              instanceNotificationExecutor,
+              measurementPath,
+              aggregationDescriptors1,
+              null,
+              typeProvider);
+
+      long expectedMaxReturnSize = 512 * Byte.BYTES + 
LongColumn.SIZE_IN_BYTES_PER_POSITION;
+      long expectedMaxRetainSize =
+          2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+      assertEquals(
+          expectedMaxReturnSize + expectedMaxRetainSize,
+          seriesAggregationScanOperator1.calculateMaxPeekMemory());
+      assertEquals(expectedMaxReturnSize, 
seriesAggregationScanOperator1.calculateMaxReturnSize());
+      assertEquals(
+          expectedMaxRetainSize,
+          
seriesAggregationScanOperator1.calculateRetainedSizeAfterCallingNext());
+
+      // case2: without group by, step is PARTIAL
+      List<AggregationDescriptor> aggregationDescriptors2 =
+          Arrays.asList(
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.PARTIAL,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))),
+              new AggregationDescriptor(
+                  AggregationType.COUNT.name().toLowerCase(),
+                  AggregationStep.PARTIAL,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))));
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator2 =
+          createSeriesAggregationScanOperator(
+              instanceNotificationExecutor,
+              measurementPath,
+              aggregationDescriptors2,
+              null,
+              typeProvider);
+
+      expectedMaxReturnSize = 512 * Byte.BYTES + 2 * 
LongColumn.SIZE_IN_BYTES_PER_POSITION;
+      expectedMaxRetainSize = 2L * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+      assertEquals(
+          expectedMaxReturnSize + expectedMaxRetainSize,
+          seriesAggregationScanOperator2.calculateMaxPeekMemory());
+      assertEquals(expectedMaxReturnSize, 
seriesAggregationScanOperator2.calculateMaxReturnSize());
+      assertEquals(
+          expectedMaxRetainSize,
+          
seriesAggregationScanOperator2.calculateRetainedSizeAfterCallingNext());
+
+      long maxTsBlockLineNumber =
+          TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+
+      // case3: with group by, total window num < 1000
+      GroupByTimeParameter groupByTimeParameter =
+          new GroupByTimeParameter(
+              0,
+              2 * maxTsBlockLineNumber,
+              maxTsBlockLineNumber / 100,
+              maxTsBlockLineNumber / 100,
+              true);
+      List<AggregationDescriptor> aggregationDescriptors3 =
+          Arrays.asList(
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))),
+              new AggregationDescriptor(
+                  AggregationType.COUNT.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))));
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator3 =
+          createSeriesAggregationScanOperator(
+              instanceNotificationExecutor,
+              measurementPath,
+              aggregationDescriptors3,
+              groupByTimeParameter,
+              typeProvider);
+
+      expectedMaxReturnSize =
+          200
+              * (TimeColumn.SIZE_IN_BYTES_PER_POSITION
+                  + 512 * Byte.BYTES
+                  + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+      expectedMaxRetainSize = 2L * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+      assertEquals(
+          expectedMaxReturnSize + expectedMaxRetainSize,
+          seriesAggregationScanOperator3.calculateMaxPeekMemory());
+      assertEquals(expectedMaxReturnSize, 
seriesAggregationScanOperator3.calculateMaxReturnSize());
+      assertEquals(
+          expectedMaxRetainSize,
+          
seriesAggregationScanOperator3.calculateRetainedSizeAfterCallingNext());
+
+      // case4: with group by, total window num > 1000
+      groupByTimeParameter = new GroupByTimeParameter(0, 2 * 
maxTsBlockLineNumber, 1, 1, true);
+      List<AggregationDescriptor> aggregationDescriptors4 =
+          Arrays.asList(
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))),
+              new AggregationDescriptor(
+                  AggregationType.COUNT.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))));
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator4 =
+          createSeriesAggregationScanOperator(
+              instanceNotificationExecutor,
+              measurementPath,
+              aggregationDescriptors4,
+              groupByTimeParameter,
+              typeProvider);
+
+      expectedMaxReturnSize =
+          maxTsBlockLineNumber
+              * (TimeColumn.SIZE_IN_BYTES_PER_POSITION
+                  + 512 * Byte.BYTES
+                  + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+      expectedMaxRetainSize = 2L * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+      assertEquals(
+          expectedMaxReturnSize + expectedMaxRetainSize,
+          seriesAggregationScanOperator4.calculateMaxPeekMemory());
+      assertEquals(expectedMaxReturnSize, 
seriesAggregationScanOperator4.calculateMaxReturnSize());
+      assertEquals(
+          expectedMaxRetainSize,
+          
seriesAggregationScanOperator4.calculateRetainedSizeAfterCallingNext());
+
+      // case5: over DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES
+      groupByTimeParameter = new GroupByTimeParameter(0, 2 * 
maxTsBlockLineNumber, 1, 1, true);
+      List<AggregationDescriptor> aggregationDescriptors5 =
+          Arrays.asList(
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))),
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))),
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new 
TimeSeriesOperand(measurementPath))));
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator5 =
+          createSeriesAggregationScanOperator(
+              instanceNotificationExecutor,
+              measurementPath,
+              aggregationDescriptors5,
+              groupByTimeParameter,
+              typeProvider);
+
+      expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+      expectedMaxRetainSize = 2L * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+      assertEquals(
+          expectedMaxReturnSize + expectedMaxRetainSize,
+          seriesAggregationScanOperator5.calculateMaxPeekMemory());
+      assertEquals(expectedMaxReturnSize, 
seriesAggregationScanOperator5.calculateMaxReturnSize());
+      assertEquals(
+          expectedMaxRetainSize,
+          
seriesAggregationScanOperator5.calculateRetainedSizeAfterCallingNext());
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  private SeriesAggregationScanOperator createSeriesAggregationScanOperator(
+      ExecutorService instanceNotificationExecutor,
+      MeasurementPath measurementPath,
+      List<AggregationDescriptor> aggregationDescriptors,
+      GroupByTimeParameter groupByTimeParameter,
+      TypeProvider typeProvider)
+      throws IllegalPathException {
+    Set<String> allSensors = Sets.newHashSet("s1");
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    PlanNodeId planNodeId = new PlanNodeId("1");
+    fragmentInstanceContext.addOperatorContext(
+        1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+    List<Aggregator> aggregators = new ArrayList<>();
+    aggregationDescriptors.forEach(
+        o ->
+            aggregators.add(
+                new Aggregator(
+                    AccumulatorFactory.createAccumulator(
+                        o.getAggregationType(), 
measurementPath.getSeriesType(), true),
+                    o.getStep())));
+
+    ITimeRangeIterator timeRangeIterator = 
initTimeRangeIterator(groupByTimeParameter, true, true);
+    long maxReturnSize =
+        AggregationUtil.calculateMaxAggregationResultSize(
+            aggregationDescriptors, timeRangeIterator, groupByTimeParameter != 
null, typeProvider);
+
+    return new SeriesAggregationScanOperator(
+        planNodeId,
+        measurementPath,
+        allSensors,
+        fragmentInstanceContext.getOperatorContexts().get(0),
+        aggregators,
+        timeRangeIterator,
+        null,
+        true,
+        groupByTimeParameter,
+        maxReturnSize);
+  }
+
+  @Test
+  public void rawDataAggregationOperatorTest() throws IllegalPathException {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+    MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1", 
TSDataType.TEXT);
+    TypeProvider typeProvider = new TypeProvider();
+    typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+    typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+    List<AggregationDescriptor> aggregationDescriptors =
+        Arrays.asList(
+            new AggregationDescriptor(
+                AggregationType.FIRST_VALUE.name().toLowerCase(),
+                AggregationStep.FINAL,
+                Collections.singletonList(new 
TimeSeriesOperand(measurementPath))),
+            new AggregationDescriptor(
+                AggregationType.COUNT.name().toLowerCase(),
+                AggregationStep.FINAL,
+                Collections.singletonList(new 
TimeSeriesOperand(measurementPath))));
+
+    List<Aggregator> aggregators = new ArrayList<>();
+    aggregationDescriptors.forEach(
+        o ->
+            aggregators.add(
+                new Aggregator(
+                    AccumulatorFactory.createAccumulator(
+                        o.getAggregationType(), 
measurementPath.getSeriesType(), true),
+                    o.getStep())));
+
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 
1000, 10, 10, true);
+    ITimeRangeIterator timeRangeIterator = 
initTimeRangeIterator(groupByTimeParameter, true, false);
+    long maxReturnSize =
+        AggregationUtil.calculateMaxAggregationResultSize(
+            aggregationDescriptors, timeRangeIterator, true, typeProvider);
+
+    RawDataAggregationOperator rawDataAggregationOperator =
+        new RawDataAggregationOperator(
+            Mockito.mock(OperatorContext.class),
+            aggregators,
+            timeRangeIterator,
+            child,
+            true,
+            maxReturnSize);
+
+    long expectedMaxReturnSize =
+        100
+            * (512 * Byte.BYTES
+                + TimeColumn.SIZE_IN_BYTES_PER_POSITION
+                + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+    long expectedMaxRetainSize = child.calculateMaxReturnSize();
+
+    assertEquals(
+        expectedMaxReturnSize
+            + expectedMaxRetainSize
+            + child.calculateRetainedSizeAfterCallingNext(),
+        rawDataAggregationOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, 
rawDataAggregationOperator.calculateMaxReturnSize());
+    assertEquals(
+        expectedMaxRetainSize + child.calculateRetainedSizeAfterCallingNext(),
+        rawDataAggregationOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void slidingWindowAggregationOperatorTest() throws 
IllegalPathException {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+    MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1", 
TSDataType.TEXT);
+    TypeProvider typeProvider = new TypeProvider();
+    typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+    typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+    List<AggregationDescriptor> aggregationDescriptors =
+        Arrays.asList(
+            new AggregationDescriptor(
+                AggregationType.FIRST_VALUE.name().toLowerCase(),
+                AggregationStep.FINAL,
+                Collections.singletonList(new 
TimeSeriesOperand(measurementPath))),
+            new AggregationDescriptor(
+                AggregationType.COUNT.name().toLowerCase(),
+                AggregationStep.FINAL,
+                Collections.singletonList(new 
TimeSeriesOperand(measurementPath))));
+
+    List<Aggregator> aggregators = new ArrayList<>();
+    aggregationDescriptors.forEach(
+        o ->
+            aggregators.add(
+                new Aggregator(
+                    AccumulatorFactory.createAccumulator(
+                        o.getAggregationType(), 
measurementPath.getSeriesType(), true),
+                    o.getStep())));
+
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 
1000, 10, 5, true);
+    ITimeRangeIterator timeRangeIterator = 
initTimeRangeIterator(groupByTimeParameter, true, false);
+    long maxReturnSize =
+        AggregationUtil.calculateMaxAggregationResultSize(
+            aggregationDescriptors, timeRangeIterator, true, typeProvider);
+
+    SlidingWindowAggregationOperator slidingWindowAggregationOperator =
+        new SlidingWindowAggregationOperator(
+            Mockito.mock(OperatorContext.class),
+            aggregators,
+            timeRangeIterator,
+            child,
+            true,
+            groupByTimeParameter,
+            maxReturnSize);
+
+    long expectedMaxReturnSize =
+        200
+            * (512 * Byte.BYTES
+                + TimeColumn.SIZE_IN_BYTES_PER_POSITION
+                + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+    long expectedMaxRetainSize = child.calculateMaxReturnSize();
+
+    assertEquals(
+        expectedMaxReturnSize
+            + expectedMaxRetainSize
+            + child.calculateRetainedSizeAfterCallingNext(),
+        slidingWindowAggregationOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, 
slidingWindowAggregationOperator.calculateMaxReturnSize());
+    assertEquals(
+        expectedMaxRetainSize + child.calculateRetainedSizeAfterCallingNext(),
+        
slidingWindowAggregationOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void aggregationOperatorTest() throws IllegalPathException {
+    List<Operator> children = new ArrayList<>(4);
+    long expectedChildrenRetainedSize = 0L;
+    long expectedMaxRetainSize = 0L;
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(64 * 
1024L);
+      expectedChildrenRetainedSize += 64 * 1024L;
+      expectedMaxRetainSize += 64 * 1024L;
+      children.add(child);
+    }
+
+    MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1", 
TSDataType.TEXT);
+    TypeProvider typeProvider = new TypeProvider();
+    typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+    typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+    List<AggregationDescriptor> aggregationDescriptors =
+        Arrays.asList(
+            new AggregationDescriptor(
+                AggregationType.FIRST_VALUE.name().toLowerCase(),
+                AggregationStep.FINAL,
+                Collections.singletonList(new 
TimeSeriesOperand(measurementPath))),
+            new AggregationDescriptor(
+                AggregationType.COUNT.name().toLowerCase(),
+                AggregationStep.FINAL,
+                Collections.singletonList(new 
TimeSeriesOperand(measurementPath))));
+
+    List<Aggregator> aggregators = new ArrayList<>();
+    aggregationDescriptors.forEach(
+        o ->
+            aggregators.add(
+                new Aggregator(
+                    AccumulatorFactory.createAccumulator(
+                        o.getAggregationType(), 
measurementPath.getSeriesType(), true),
+                    o.getStep())));
+
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 
1000, 10, 10, true);
+    ITimeRangeIterator timeRangeIterator = 
initTimeRangeIterator(groupByTimeParameter, true, false);
+    long maxReturnSize =
+        AggregationUtil.calculateMaxAggregationResultSize(
+            aggregationDescriptors, timeRangeIterator, true, typeProvider);
+
+    AggregationOperator aggregationOperator =
+        new AggregationOperator(
+            Mockito.mock(OperatorContext.class),
+            aggregators,
+            timeRangeIterator,
+            children,
+            maxReturnSize);
+
+    long expectedMaxReturnSize =
+        100
+            * (512 * Byte.BYTES
+                + TimeColumn.SIZE_IN_BYTES_PER_POSITION
+                + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+
+    assertEquals(
+        expectedMaxReturnSize + expectedMaxRetainSize + 
expectedChildrenRetainedSize,
+        aggregationOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, 
aggregationOperator.calculateMaxReturnSize());
+    assertEquals(
+        expectedMaxRetainSize + expectedChildrenRetainedSize,
+        aggregationOperator.calculateRetainedSizeAfterCallingNext());
+  }
 }

Reply via email to