This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/lmh/AggOpMemoryControl by this
push:
new 632b390c2f add UTs
632b390c2f is described below
commit 632b390c2f5670cd608ca82717ae2b8e359c0ad8
Author: Minghui Liu <[email protected]>
AuthorDate: Mon Aug 15 10:51:43 2022 +0800
add UTs
---
.../mpp/execution/operator/OperatorMemoryTest.java | 460 +++++++++++++++++++++
1 file changed, 460 insertions(+)
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index d0598e6494..085f9829c6 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -22,11 +22,15 @@ import
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
@@ -34,6 +38,8 @@ import
org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperat
import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.SortOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
import
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
@@ -63,20 +69,29 @@ import
org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesSchemaScanOpe
import
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
import
org.apache.iotdb.db.mpp.transformation.dag.column.binary.ArithmeticAdditionColumnTransformer;
import
org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareLessEqualColumnTransformer;
import
org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer;
import
org.apache.iotdb.db.mpp.transformation.dag.column.leaf.TimeColumnTransformer;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import org.apache.iotdb.tsfile.read.common.type.BooleanType;
import org.apache.iotdb.tsfile.read.common.type.LongType;
import org.apache.iotdb.tsfile.read.common.type.TypeEnum;
@@ -95,6 +110,7 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static
org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator.MAP_NODE_RETRAINED_SIZE;
import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
@@ -1073,4 +1089,448 @@ public class OperatorMemoryTest {
assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
assertEquals(expectedRetainedSize,
operator.calculateRetainedSizeAfterCallingNext());
}
+
+ @Test
+ public void seriesAggregationScanOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1",
TSDataType.TEXT);
+ TypeProvider typeProvider = new TypeProvider();
+ typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+ typeProvider.setType("min_time(root.sg.d1.s1)", TSDataType.INT64);
+ typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+ // case1: without group by, step is SINGLE
+ List<AggregationDescriptor> aggregationDescriptors1 =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))));
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator1 =
+ createSeriesAggregationScanOperator(
+ instanceNotificationExecutor,
+ measurementPath,
+ aggregationDescriptors1,
+ null,
+ typeProvider);
+
+ long expectedMaxReturnSize = 512 * Byte.BYTES +
LongColumn.SIZE_IN_BYTES_PER_POSITION;
+ long expectedMaxRetainSize =
+ 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+ assertEquals(
+ expectedMaxReturnSize + expectedMaxRetainSize,
+ seriesAggregationScanOperator1.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize,
seriesAggregationScanOperator1.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize,
+
seriesAggregationScanOperator1.calculateRetainedSizeAfterCallingNext());
+
+ // case2: without group by, step is PARTIAL
+ List<AggregationDescriptor> aggregationDescriptors2 =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.PARTIAL,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.PARTIAL,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))));
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator2 =
+ createSeriesAggregationScanOperator(
+ instanceNotificationExecutor,
+ measurementPath,
+ aggregationDescriptors2,
+ null,
+ typeProvider);
+
+ expectedMaxReturnSize = 512 * Byte.BYTES + 2 *
LongColumn.SIZE_IN_BYTES_PER_POSITION;
+ expectedMaxRetainSize = 2L *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+ assertEquals(
+ expectedMaxReturnSize + expectedMaxRetainSize,
+ seriesAggregationScanOperator2.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize,
seriesAggregationScanOperator2.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize,
+
seriesAggregationScanOperator2.calculateRetainedSizeAfterCallingNext());
+
+ long maxTsBlockLineNumber =
+ TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+
+ // case3: with group by, total window num < 1000
+ GroupByTimeParameter groupByTimeParameter =
+ new GroupByTimeParameter(
+ 0,
+ 2 * maxTsBlockLineNumber,
+ maxTsBlockLineNumber / 100,
+ maxTsBlockLineNumber / 100,
+ true);
+ List<AggregationDescriptor> aggregationDescriptors3 =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))));
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator3 =
+ createSeriesAggregationScanOperator(
+ instanceNotificationExecutor,
+ measurementPath,
+ aggregationDescriptors3,
+ groupByTimeParameter,
+ typeProvider);
+
+ expectedMaxReturnSize =
+ 200
+ * (TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ + 512 * Byte.BYTES
+ + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+ expectedMaxRetainSize = 2L *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+ assertEquals(
+ expectedMaxReturnSize + expectedMaxRetainSize,
+ seriesAggregationScanOperator3.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize,
seriesAggregationScanOperator3.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize,
+
seriesAggregationScanOperator3.calculateRetainedSizeAfterCallingNext());
+
+ // case4: with group by, total window num > 1000
+ groupByTimeParameter = new GroupByTimeParameter(0, 2 *
maxTsBlockLineNumber, 1, 1, true);
+ List<AggregationDescriptor> aggregationDescriptors4 =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))));
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator4 =
+ createSeriesAggregationScanOperator(
+ instanceNotificationExecutor,
+ measurementPath,
+ aggregationDescriptors4,
+ groupByTimeParameter,
+ typeProvider);
+
+ expectedMaxReturnSize =
+ maxTsBlockLineNumber
+ * (TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ + 512 * Byte.BYTES
+ + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+ expectedMaxRetainSize = 2L *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+ assertEquals(
+ expectedMaxReturnSize + expectedMaxRetainSize,
+ seriesAggregationScanOperator4.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize,
seriesAggregationScanOperator4.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize,
+
seriesAggregationScanOperator4.calculateRetainedSizeAfterCallingNext());
+
+ // case5: over DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES
+ groupByTimeParameter = new GroupByTimeParameter(0, 2 *
maxTsBlockLineNumber, 1, 1, true);
+ List<AggregationDescriptor> aggregationDescriptors5 =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))));
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator5 =
+ createSeriesAggregationScanOperator(
+ instanceNotificationExecutor,
+ measurementPath,
+ aggregationDescriptors5,
+ groupByTimeParameter,
+ typeProvider);
+
+ expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ expectedMaxRetainSize = 2L *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+ assertEquals(
+ expectedMaxReturnSize + expectedMaxRetainSize,
+ seriesAggregationScanOperator5.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize,
seriesAggregationScanOperator5.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize,
+
seriesAggregationScanOperator5.calculateRetainedSizeAfterCallingNext());
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ private SeriesAggregationScanOperator createSeriesAggregationScanOperator(
+ ExecutorService instanceNotificationExecutor,
+ MeasurementPath measurementPath,
+ List<AggregationDescriptor> aggregationDescriptors,
+ GroupByTimeParameter groupByTimeParameter,
+ TypeProvider typeProvider)
+ throws IllegalPathException {
+ Set<String> allSensors = Sets.newHashSet("s1");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+ List<Aggregator> aggregators = new ArrayList<>();
+ aggregationDescriptors.forEach(
+ o ->
+ aggregators.add(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ o.getAggregationType(),
measurementPath.getSeriesType(), true),
+ o.getStep())));
+
+ ITimeRangeIterator timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, true, true);
+ long maxReturnSize =
+ AggregationUtil.calculateMaxAggregationResultSize(
+ aggregationDescriptors, timeRangeIterator, groupByTimeParameter !=
null, typeProvider);
+
+ return new SeriesAggregationScanOperator(
+ planNodeId,
+ measurementPath,
+ allSensors,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ aggregators,
+ timeRangeIterator,
+ null,
+ true,
+ groupByTimeParameter,
+ maxReturnSize);
+ }
+
+ @Test
+ public void rawDataAggregationOperatorTest() throws IllegalPathException {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+ MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1",
TSDataType.TEXT);
+ TypeProvider typeProvider = new TypeProvider();
+ typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+ typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+ List<AggregationDescriptor> aggregationDescriptors =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))));
+
+ List<Aggregator> aggregators = new ArrayList<>();
+ aggregationDescriptors.forEach(
+ o ->
+ aggregators.add(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ o.getAggregationType(),
measurementPath.getSeriesType(), true),
+ o.getStep())));
+
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0,
1000, 10, 10, true);
+ ITimeRangeIterator timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, true, false);
+ long maxReturnSize =
+ AggregationUtil.calculateMaxAggregationResultSize(
+ aggregationDescriptors, timeRangeIterator, true, typeProvider);
+
+ RawDataAggregationOperator rawDataAggregationOperator =
+ new RawDataAggregationOperator(
+ Mockito.mock(OperatorContext.class),
+ aggregators,
+ timeRangeIterator,
+ child,
+ true,
+ maxReturnSize);
+
+ long expectedMaxReturnSize =
+ 100
+ * (512 * Byte.BYTES
+ + TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+ long expectedMaxRetainSize = child.calculateMaxReturnSize();
+
+ assertEquals(
+ expectedMaxReturnSize
+ + expectedMaxRetainSize
+ + child.calculateRetainedSizeAfterCallingNext(),
+ rawDataAggregationOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize,
rawDataAggregationOperator.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize + child.calculateRetainedSizeAfterCallingNext(),
+ rawDataAggregationOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void slidingWindowAggregationOperatorTest() throws
IllegalPathException {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+ MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1",
TSDataType.TEXT);
+ TypeProvider typeProvider = new TypeProvider();
+ typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+ typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+ List<AggregationDescriptor> aggregationDescriptors =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))));
+
+ List<Aggregator> aggregators = new ArrayList<>();
+ aggregationDescriptors.forEach(
+ o ->
+ aggregators.add(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ o.getAggregationType(),
measurementPath.getSeriesType(), true),
+ o.getStep())));
+
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0,
1000, 10, 5, true);
+ ITimeRangeIterator timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, true, false);
+ long maxReturnSize =
+ AggregationUtil.calculateMaxAggregationResultSize(
+ aggregationDescriptors, timeRangeIterator, true, typeProvider);
+
+ SlidingWindowAggregationOperator slidingWindowAggregationOperator =
+ new SlidingWindowAggregationOperator(
+ Mockito.mock(OperatorContext.class),
+ aggregators,
+ timeRangeIterator,
+ child,
+ true,
+ groupByTimeParameter,
+ maxReturnSize);
+
+ long expectedMaxReturnSize =
+ 200
+ * (512 * Byte.BYTES
+ + TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+ long expectedMaxRetainSize = child.calculateMaxReturnSize();
+
+ assertEquals(
+ expectedMaxReturnSize
+ + expectedMaxRetainSize
+ + child.calculateRetainedSizeAfterCallingNext(),
+ slidingWindowAggregationOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize,
slidingWindowAggregationOperator.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize + child.calculateRetainedSizeAfterCallingNext(),
+
slidingWindowAggregationOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void aggregationOperatorTest() throws IllegalPathException {
+ List<Operator> children = new ArrayList<>(4);
+ long expectedChildrenRetainedSize = 0L;
+ long expectedMaxRetainSize = 0L;
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(64 *
1024L);
+ expectedChildrenRetainedSize += 64 * 1024L;
+ expectedMaxRetainSize += 64 * 1024L;
+ children.add(child);
+ }
+
+ MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1",
TSDataType.TEXT);
+ TypeProvider typeProvider = new TypeProvider();
+ typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+ typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+ List<AggregationDescriptor> aggregationDescriptors =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))));
+
+ List<Aggregator> aggregators = new ArrayList<>();
+ aggregationDescriptors.forEach(
+ o ->
+ aggregators.add(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ o.getAggregationType(),
measurementPath.getSeriesType(), true),
+ o.getStep())));
+
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0,
1000, 10, 10, true);
+ ITimeRangeIterator timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, true, false);
+ long maxReturnSize =
+ AggregationUtil.calculateMaxAggregationResultSize(
+ aggregationDescriptors, timeRangeIterator, true, typeProvider);
+
+ AggregationOperator aggregationOperator =
+ new AggregationOperator(
+ Mockito.mock(OperatorContext.class),
+ aggregators,
+ timeRangeIterator,
+ children,
+ maxReturnSize);
+
+ long expectedMaxReturnSize =
+ 100
+ * (512 * Byte.BYTES
+ + TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+
+ assertEquals(
+ expectedMaxReturnSize + expectedMaxRetainSize +
expectedChildrenRetainedSize,
+ aggregationOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize,
aggregationOperator.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize + expectedChildrenRetainedSize,
+ aggregationOperator.calculateRetainedSizeAfterCallingNext());
+ }
}