This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/MemoryControl by this push:
new fb2f684107 [To MemoryControl] [IOTDB-4081] Implemation of operators
related to aggregation for memory control (#6993)
fb2f684107 is described below
commit fb2f684107a54357d4a3d3bd1156289572f5f524
Author: liuminghui233 <[email protected]>
AuthorDate: Mon Aug 15 10:57:54 2022 +0800
[To MemoryControl] [IOTDB-4081] Implemation of operators related to
aggregation for memory control (#6993)
---
.../iotdb/db/metadata/LocalSchemaProcessor.java | 6 +-
.../db/metadata/rescon/SchemaResourceManager.java | 4 +-
...tatistics.java => SchemaStatisticsManager.java} | 12 +-
.../schemaregion/SchemaRegionMemoryImpl.java | 12 +-
.../schemaregion/SchemaRegionSchemaFileImpl.java | 12 +-
.../timerangeiterator/AggrWindowIterator.java | 23 ++
.../timerangeiterator/ITimeRangeIterator.java | 2 +
.../timerangeiterator/PreAggrWindowIterator.java | 17 +
.../PreAggrWindowWithNaturalMonthIterator.java | 16 +
.../SingleTimeWindowIterator.java | 5 +
.../db/mpp/execution/operator/AggregationUtil.java | 81 ++++
.../operator/process/AggregationOperator.java | 35 +-
.../process/RawDataAggregationOperator.java | 7 +-
.../process/SingleInputAggregationOperator.java | 31 +-
.../process/SlidingWindowAggregationOperator.java | 6 +-
.../AbstractSeriesAggregationScanOperator.java | 31 +-
.../AlignedSeriesAggregationScanOperator.java | 9 +-
.../source/SeriesAggregationScanOperator.java | 9 +-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 198 ++++++---
.../iotdb/db/mpp/statistics/StatisticsManager.java | 46 +++
.../iotdb/db/mpp/statistics/TimeseriesStats.java | 24 ++
.../iotdb/db/utils/datastructure/TimeSelector.java | 5 +
.../db/mpp/aggregation/TimeRangeIteratorTest.java | 2 +
.../operator/AggregationOperatorTest.java | 15 +-
.../AlignedSeriesAggregationScanOperatorTest.java | 11 +-
.../execution/operator/LastQueryOperatorTest.java | 18 +-
.../operator/LastQuerySortOperatorTest.java | 18 +-
.../mpp/execution/operator/OperatorMemoryTest.java | 460 +++++++++++++++++++++
.../operator/RawDataAggregationOperatorTest.java | 5 +-
.../SeriesAggregationScanOperatorTest.java | 6 +-
.../SlidingWindowAggregationOperatorTest.java | 10 +-
.../operator/UpdateLastCacheOperatorTest.java | 6 +-
32 files changed, 1018 insertions(+), 124 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
index 32518a6239..fafd13ddd9 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.Template;
@@ -466,7 +466,7 @@ public class LocalSchemaProcessor {
// todo this is for test assistance, refactor this to support massive
timeseries
if (pathPattern.getFullPath().equals("root.**")
&& TemplateManager.getInstance().getAllTemplateName().isEmpty()) {
- return (int) TimeseriesStatistics.getInstance().getTotalSeriesNumber();
+ return (int)
SchemaStatisticsManager.getInstance().getTotalSeriesNumber();
}
int count = 0;
for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern,
isPrefixMatch)) {
@@ -1380,7 +1380,7 @@ public class LocalSchemaProcessor {
@TestOnly
public long getTotalSeriesNumber() {
- return TimeseriesStatistics.getInstance().getTotalSeriesNumber();
+ return SchemaStatisticsManager.getInstance().getTotalSeriesNumber();
}
/**
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
index 0fe18ac955..c5df8ff9a7 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
@@ -30,7 +30,7 @@ public class SchemaResourceManager {
private SchemaResourceManager() {}
public static void initSchemaResource() {
- TimeseriesStatistics.getInstance().init();
+ SchemaStatisticsManager.getInstance().init();
MemoryStatistics.getInstance().init();
if (IoTDBDescriptor.getInstance()
.getConfig()
@@ -41,7 +41,7 @@ public class SchemaResourceManager {
}
public static void clearSchemaResource() {
- TimeseriesStatistics.getInstance().clear();
+ SchemaStatisticsManager.getInstance().clear();
MemoryStatistics.getInstance().clear();
if (IoTDBDescriptor.getInstance()
.getConfig()
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
similarity index 86%
rename from
server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
rename to
server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
index f8a1f21a20..6d63cdfe85 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
@@ -25,22 +25,22 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
import java.util.concurrent.atomic.AtomicLong;
-public class TimeseriesStatistics {
+public class SchemaStatisticsManager {
private final AtomicLong totalSeriesNumber = new AtomicLong();
- private static class TimeseriesStatisticsHolder {
+ private static class SchemaStatisticsHolder {
- private TimeseriesStatisticsHolder() {
+ private SchemaStatisticsHolder() {
// allowed to do nothing
}
- private static final TimeseriesStatistics INSTANCE = new
TimeseriesStatistics();
+ private static final SchemaStatisticsManager INSTANCE = new
SchemaStatisticsManager();
}
/** we should not use this function in other place, but only in IoTDB class
*/
- public static TimeseriesStatistics getInstance() {
- return TimeseriesStatisticsHolder.INSTANCE;
+ public static SchemaStatisticsManager getInstance() {
+ return SchemaStatisticsHolder.INSTANCE;
}
public void init() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 452f2c4a5b..2e1ff4801b 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -52,7 +52,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGMemoryImpl;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
import org.apache.iotdb.db.metadata.tag.TagManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.template.TemplateManager;
@@ -169,7 +169,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
private boolean usingMLog = true;
private MLogWriter logWriter;
- private TimeseriesStatistics timeseriesStatistics =
TimeseriesStatistics.getInstance();
+ private SchemaStatisticsManager schemaStatisticsManager =
SchemaStatisticsManager.getInstance();
private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
private final IStorageGroupMNode storageGroupMNode;
@@ -455,7 +455,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
// collect all the LeafMNode in this schema region
List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
- timeseriesStatistics.deleteTimeseries(leafMNodes.size());
+ schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
// drop triggers with no exceptions
TriggerEngine.drop(leafMNodes);
@@ -602,7 +602,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
mNodeCache.invalidate(path.getDevicePath());
// update statistics and schemaDataTypeNumMap
- timeseriesStatistics.addTimeseries(1);
+ schemaStatisticsManager.addTimeseries(1);
// update tag index
if (offset != -1 && isRecovering) {
@@ -719,7 +719,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
mNodeCache.invalidate(prefixPath);
// update statistics and schemaDataTypeNumMap
- timeseriesStatistics.addTimeseries(plan.getMeasurements().size());
+ schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
List<Long> tagOffsets = plan.getTagOffsets();
for (int i = 0; i < measurements.size(); i++) {
@@ -861,7 +861,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
mNodeCache.invalidate(node.getPartialPath());
- timeseriesStatistics.deleteTimeseries(1);
+ schemaStatisticsManager.deleteTimeseries(1);
return storageGroupPath;
}
// endregion
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 49fd1951b6..066b69902e 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -50,7 +50,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGCachedImpl;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
import org.apache.iotdb.db.metadata.tag.TagManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.template.TemplateManager;
@@ -163,7 +163,7 @@ public class SchemaRegionSchemaFileImpl implements
ISchemaRegion {
private File logFile;
private MLogWriter logWriter;
- private TimeseriesStatistics timeseriesStatistics =
TimeseriesStatistics.getInstance();
+ private SchemaStatisticsManager schemaStatisticsManager =
SchemaStatisticsManager.getInstance();
private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
private final IStorageGroupMNode storageGroupMNode;
@@ -414,7 +414,7 @@ public class SchemaRegionSchemaFileImpl implements
ISchemaRegion {
// collect all the LeafMNode in this schema region
List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
- timeseriesStatistics.deleteTimeseries(leafMNodes.size());
+ schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
// drop triggers with no exceptions
TriggerEngine.drop(leafMNodes);
@@ -496,7 +496,7 @@ public class SchemaRegionSchemaFileImpl implements
ISchemaRegion {
mNodeCache.invalidate(path.getDevicePath());
// update statistics and schemaDataTypeNumMap
- timeseriesStatistics.addTimeseries(1);
+ schemaStatisticsManager.addTimeseries(1);
// update tag index
if (offset != -1 && isRecovering) {
@@ -638,7 +638,7 @@ public class SchemaRegionSchemaFileImpl implements
ISchemaRegion {
mNodeCache.invalidate(prefixPath);
// update statistics and schemaDataTypeNumMap
- timeseriesStatistics.addTimeseries(plan.getMeasurements().size());
+ schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
List<Long> tagOffsets = plan.getTagOffsets();
for (int i = 0; i < measurements.size(); i++) {
@@ -786,7 +786,7 @@ public class SchemaRegionSchemaFileImpl implements
ISchemaRegion {
mNodeCache.invalidate(node.getPartialPath());
- timeseriesStatistics.deleteTimeseries(1);
+ schemaStatisticsManager.deleteTimeseries(1);
return storageGroupPath;
}
// endregion
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
index f27c56223c..50f50e4175 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
@@ -176,4 +176,27 @@ public class AggrWindowIterator implements
ITimeRangeIterator {
public long currentOutputTime() {
return leftCRightO ? curTimeRange.getMin() : curTimeRange.getMax();
}
+
+ @Override
+ public long getTotalIntervalNum() {
+ long queryRange = endTime - startTime;
+ long intervalNum;
+
+ if (isSlidingStepByMonth) {
+ intervalNum = (long) Math.ceil(queryRange / (double) (slidingStep *
MS_TO_MONTH));
+ long retStartTime = DatetimeUtils.calcIntervalByMonth(startTime,
intervalNum * slidingStep);
+ while (retStartTime > endTime) {
+ intervalNum -= 1;
+ retStartTime = DatetimeUtils.calcIntervalByMonth(startTime,
intervalNum * slidingStep);
+ }
+ } else {
+ intervalNum = (long) Math.ceil(queryRange / (double) slidingStep);
+ }
+ return intervalNum;
+ }
+
+ public void reset() {
+ curTimeRange = null;
+ hasCachedTimeRange = false;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
index ccc999e810..47a6961fd7 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
@@ -55,4 +55,6 @@ public interface ITimeRangeIterator {
* @return minTime if leftCloseRightOpen, else maxTime.
*/
long currentOutputTime();
+
+ long getTotalIntervalNum();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowIterator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowIterator.java
index f7f6297ddb..c0b8c3b000 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowIterator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowIterator.java
@@ -173,4 +173,21 @@ public class PreAggrWindowIterator implements
ITimeRangeIterator {
public long currentOutputTime() {
return leftCRightO ? curTimeRange.getMin() : curTimeRange.getMax();
}
+
+ @Override
+ public long getTotalIntervalNum() {
+ long queryRange = endTime - startTime;
+ if (slidingStep >= interval || interval % slidingStep == 0) {
+ return (long) Math.ceil(queryRange / (double) slidingStep);
+ }
+
+ long interval1 = interval % slidingStep, interval2 = slidingStep -
interval % slidingStep;
+ long intervalNum = Math.floorDiv(queryRange, interval1 + interval2);
+ long tmpStartTime = startTime + intervalNum * (interval1 + interval2);
+ if (tmpStartTime + interval1 > endTime) {
+ return intervalNum * 2 + 1;
+ } else {
+ return intervalNum * 2 + 2;
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
index d8eede01ef..87f6cb8630 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
@@ -147,4 +147,20 @@ public class PreAggrWindowWithNaturalMonthIterator
implements ITimeRangeIterator
public long currentOutputTime() {
return leftCRightO ? curTimeRange.getMin() : curTimeRange.getMax();
}
+
+ @Override
+ public long getTotalIntervalNum() {
+ long tmpInterval = 0;
+ while (hasNextTimeRange()) {
+ tmpInterval++;
+ nextTimeRange();
+ }
+
+ curTimeRange = null;
+ timeBoundaryHeap.clear();
+ aggrWindowIterator.reset();
+ initHeap();
+
+ return tmpInterval;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
index b998557604..4802267afa 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
@@ -71,4 +71,9 @@ public class SingleTimeWindowIterator implements
ITimeRangeIterator {
public long currentOutputTime() {
return curTimeRange.getMin();
}
+
+ @Override
+ public long getTotalIntervalNum() {
+ return 1;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
index 5465800553..564f21245e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
@@ -19,20 +19,37 @@
package org.apache.iotdb.db.mpp.execution.operator;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.SingleTimeWindowIterator;
import
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.TimeRangeIteratorFactory;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.statistics.StatisticsManager;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.iotdb.tsfile.utils.Pair;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static
org.apache.iotdb.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange;
public class AggregationUtil {
@@ -158,4 +175,68 @@ public class AggregationUtil {
}
return true;
}
+
+ public static long calculateMaxAggregationResultSize(
+ List<? extends AggregationDescriptor> aggregationDescriptors,
+ ITimeRangeIterator timeRangeIterator,
+ boolean isGroupByQuery,
+ TypeProvider typeProvider) {
+ long valueColumnsSizePerLine = 0;
+ for (AggregationDescriptor descriptor : aggregationDescriptors) {
+ List<TSDataType> outPutDataTypes =
+ descriptor.getOutputColumnNames().stream()
+ .map(typeProvider::getType)
+ .collect(Collectors.toList());
+ for (TSDataType tsDataType : outPutDataTypes) {
+ checkArgument(
+ descriptor.getInputExpressions().get(0) instanceof
TimeSeriesOperand,
+ "The input of aggregate function must be the original time
series.");
+ PartialPath inputSeriesPath =
+ ((TimeSeriesOperand)
descriptor.getInputExpressions().get(0)).getPath();
+ valueColumnsSizePerLine += getOutputColumnSizePerLine(tsDataType,
inputSeriesPath);
+ }
+ }
+
+ return isGroupByQuery
+ ? Math.min(
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ Math.min(
+
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(),
+ timeRangeIterator.getTotalIntervalNum())
+ * (TimeColumn.SIZE_IN_BYTES_PER_POSITION +
valueColumnsSizePerLine))
+ : valueColumnsSizePerLine;
+ }
+
+ public static long calculateMaxAggregationResultSizeForLastQuery(
+ List<Aggregator> aggregators, PartialPath inputSeriesPath) {
+ long valueColumnsSizePerLine = 0;
+ List<TSDataType> outPutDataTypes =
+ aggregators.stream()
+ .flatMap(aggregator -> Arrays.stream(aggregator.getOutputType()))
+ .collect(Collectors.toList());
+ for (TSDataType tsDataType : outPutDataTypes) {
+ valueColumnsSizePerLine += getOutputColumnSizePerLine(tsDataType,
inputSeriesPath);
+ }
+ return valueColumnsSizePerLine;
+ }
+
+ private static long getOutputColumnSizePerLine(
+ TSDataType tsDataType, PartialPath inputSeriesPath) {
+ switch (tsDataType) {
+ case INT32:
+ return IntColumn.SIZE_IN_BYTES_PER_POSITION;
+ case INT64:
+ return LongColumn.SIZE_IN_BYTES_PER_POSITION;
+ case FLOAT:
+ return FloatColumn.SIZE_IN_BYTES_PER_POSITION;
+ case DOUBLE:
+ return DoubleColumn.SIZE_IN_BYTES_PER_POSITION;
+ case BOOLEAN:
+ return BooleanColumn.SIZE_IN_BYTES_PER_POSITION;
+ case TEXT:
+ return
StatisticsManager.getInstance().getMaxBinarySizeInBytes(inputSeriesPath);
+ default:
+ throw new UnsupportedOperationException("Unknown data type " +
tsDataType);
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 8cf4aef8b0..7efe992cdd 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -37,7 +36,6 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.util.concurrent.Futures.successfulAsList;
import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
-import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
/**
* AggregationOperator can process the situation: aggregation of intermediate
aggregate result, it
@@ -62,16 +60,20 @@ public class AggregationOperator implements ProcessOperator
{
// using for building result tsBlock
private final TsBlockBuilder resultTsBlockBuilder;
+ private final long maxRetainedSize;
+ private final long childrenRetainedSize;
+ private final long maxReturnSize;
+
public AggregationOperator(
OperatorContext operatorContext,
List<Aggregator> aggregators,
+ ITimeRangeIterator timeRangeIterator,
List<Operator> children,
- boolean ascending,
- GroupByTimeParameter groupByTimeParameter,
- boolean outputPartialTimeWindow) {
+ long maxReturnSize) {
this.operatorContext = operatorContext;
this.children = children;
this.aggregators = aggregators;
+ this.timeRangeIterator = timeRangeIterator;
this.inputOperatorsCount = children.size();
this.inputTsBlocks = new TsBlock[inputOperatorsCount];
@@ -80,14 +82,16 @@ public class AggregationOperator implements ProcessOperator
{
canCallNext[i] = false;
}
- this.timeRangeIterator =
- initTimeRangeIterator(groupByTimeParameter, ascending,
outputPartialTimeWindow);
-
List<TSDataType> dataTypes = new ArrayList<>();
for (Aggregator aggregator : aggregators) {
dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
}
this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+ this.maxRetainedSize =
children.stream().mapToLong(Operator::calculateMaxReturnSize).sum();
+ this.childrenRetainedSize =
+
children.stream().mapToLong(Operator::calculateRetainedSizeAfterCallingNext).sum();
+ this.maxReturnSize = maxReturnSize;
}
@Override
@@ -95,6 +99,21 @@ public class AggregationOperator implements ProcessOperator {
return operatorContext;
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ return maxReturnSize + maxRetainedSize + childrenRetainedSize;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return maxRetainedSize + childrenRetainedSize;
+ }
+
@Override
public ListenableFuture<?> isBlocked() {
List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index 88fb5fe20a..d95968850d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -45,10 +45,11 @@ public class RawDataAggregationOperator extends
SingleInputAggregationOperator {
public RawDataAggregationOperator(
OperatorContext operatorContext,
List<Aggregator> aggregators,
+ ITimeRangeIterator timeRangeIterator,
Operator child,
boolean ascending,
- GroupByTimeParameter groupByTimeParameter) {
- super(operatorContext, aggregators, child, ascending,
groupByTimeParameter, true);
+ long maxReturnSize) {
+ super(operatorContext, aggregators, child, ascending, timeRangeIterator,
maxReturnSize);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index c59de13988..1d746f1150 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -37,7 +36,6 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
-import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
public abstract class SingleInputAggregationOperator implements
ProcessOperator {
@@ -57,26 +55,30 @@ public abstract class SingleInputAggregationOperator
implements ProcessOperator
// using for building result tsBlock
protected final TsBlockBuilder resultTsBlockBuilder;
+ protected final long maxRetainedSize;
+ protected final long maxReturnSize;
+
public SingleInputAggregationOperator(
OperatorContext operatorContext,
List<Aggregator> aggregators,
Operator child,
boolean ascending,
- GroupByTimeParameter groupByTimeParameter,
- boolean outputPartialTimeWindow) {
+ ITimeRangeIterator timeRangeIterator,
+ long maxReturnSize) {
this.operatorContext = operatorContext;
this.ascending = ascending;
this.child = child;
this.aggregators = aggregators;
-
- this.timeRangeIterator =
- initTimeRangeIterator(groupByTimeParameter, ascending,
outputPartialTimeWindow);
+ this.timeRangeIterator = timeRangeIterator;
List<TSDataType> dataTypes = new ArrayList<>();
for (Aggregator aggregator : aggregators) {
dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
}
this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+ this.maxRetainedSize = child.calculateMaxReturnSize();
+ this.maxReturnSize = maxReturnSize;
}
@Override
@@ -146,4 +148,19 @@ public abstract class SingleInputAggregationOperator
implements ProcessOperator
curTimeRange = null;
appendAggregationResult(resultTsBlockBuilder, aggregators,
timeRangeIterator);
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return maxReturnSize + maxRetainedSize +
child.calculateRetainedSizeAfterCallingNext();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 10303016c5..d73491d3e4 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -40,10 +40,12 @@ public class SlidingWindowAggregationOperator extends
SingleInputAggregationOper
public SlidingWindowAggregationOperator(
OperatorContext operatorContext,
List<Aggregator> aggregators,
+ ITimeRangeIterator timeRangeIterator,
Operator child,
boolean ascending,
- GroupByTimeParameter groupByTimeParameter) {
- super(operatorContext, aggregators, child, ascending,
groupByTimeParameter, false);
+ GroupByTimeParameter groupByTimeParameter,
+ long maxReturnSize) {
+ super(operatorContext, aggregators, child, ascending, timeRangeIterator,
maxReturnSize);
checkArgument(
groupByTimeParameter != null,
"GroupByTimeParameter cannot be null in
SlidingWindowAggregationOperator");
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index 2d7d671be9..e7b3f37885 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
@@ -40,7 +41,6 @@ import java.util.concurrent.TimeUnit;
import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
-import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult;
public abstract class AbstractSeriesAggregationScanOperator implements
DataSourceOperator {
@@ -68,14 +68,19 @@ public abstract class AbstractSeriesAggregationScanOperator
implements DataSourc
protected boolean finished = false;
+ private final long maxRetainedSize;
+ private final long maxReturnSize;
+
public AbstractSeriesAggregationScanOperator(
PlanNodeId sourceId,
OperatorContext context,
SeriesScanUtil seriesScanUtil,
int subSensorSize,
List<Aggregator> aggregators,
+ ITimeRangeIterator timeRangeIterator,
boolean ascending,
- GroupByTimeParameter groupByTimeParameter) {
+ GroupByTimeParameter groupByTimeParameter,
+ long maxReturnSize) {
this.sourceId = sourceId;
this.operatorContext = context;
this.ascending = ascending;
@@ -83,14 +88,17 @@ public abstract class AbstractSeriesAggregationScanOperator
implements DataSourc
this.seriesScanUtil = seriesScanUtil;
this.subSensorSize = subSensorSize;
this.aggregators = aggregators;
-
- this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter,
ascending, true);
+ this.timeRangeIterator = timeRangeIterator;
List<TSDataType> dataTypes = new ArrayList<>();
for (Aggregator aggregator : aggregators) {
dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
}
this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+ this.maxRetainedSize =
+ (1L + subSensorSize) *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ this.maxReturnSize = maxReturnSize;
}
@Override
@@ -108,6 +116,21 @@ public abstract class
AbstractSeriesAggregationScanOperator implements DataSourc
seriesScanUtil.initQueryDataSource(dataSource);
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ return maxRetainedSize + maxReturnSize;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return maxRetainedSize;
+ }
+
@Override
public boolean hasNext() {
return timeRangeIterator.hasNextTimeRange();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
index f9fa1f8d52..2ad3eb1a83 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -37,9 +38,11 @@ public class AlignedSeriesAggregationScanOperator extends
AbstractSeriesAggregat
AlignedPath seriesPath,
OperatorContext context,
List<Aggregator> aggregators,
+ ITimeRangeIterator timeRangeIterator,
Filter timeFilter,
boolean ascending,
- GroupByTimeParameter groupByTimeParameter) {
+ GroupByTimeParameter groupByTimeParameter,
+ long maxReturnSize) {
super(
sourceId,
context,
@@ -52,7 +55,9 @@ public class AlignedSeriesAggregationScanOperator extends
AbstractSeriesAggregat
ascending),
seriesPath.getMeasurementList().size(),
aggregators,
+ timeRangeIterator,
ascending,
- groupByTimeParameter);
+ groupByTimeParameter,
+ maxReturnSize);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
index 157f51f2fa..99b24e06a5 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -44,9 +45,11 @@ public class SeriesAggregationScanOperator extends
AbstractSeriesAggregationScan
Set<String> allSensors,
OperatorContext context,
List<Aggregator> aggregators,
+ ITimeRangeIterator timeRangeIterator,
Filter timeFilter,
boolean ascending,
- GroupByTimeParameter groupByTimeParameter) {
+ GroupByTimeParameter groupByTimeParameter,
+ long maxReturnSize) {
super(
sourceId,
context,
@@ -60,7 +63,9 @@ public class SeriesAggregationScanOperator extends
AbstractSeriesAggregationScan
ascending),
1,
aggregators,
+ timeRangeIterator,
ascending,
- groupByTimeParameter);
+ groupByTimeParameter,
+ maxReturnSize);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 5c4570a1fe..0978a006f5 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -27,12 +27,14 @@ import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import
org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregatorFactory;
+import
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.exchange.ISourceHandle;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
+import org.apache.iotdb.db.mpp.execution.operator.AggregationUtil;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
@@ -150,6 +152,7 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OutputColumn;
import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
@@ -185,6 +188,9 @@ import java.util.Objects;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSize;
+import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSizeForLastQuery;
+import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static
org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
/** This Visitor is responsible for transferring PlanNode Tree to Operator
Tree */
@@ -272,6 +278,58 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
return seriesScanOperator;
}
+ @Override
+ public Operator visitSeriesAggregationScan(
+ SeriesAggregationScanNode node, LocalExecutionPlanContext context) {
+ PartialPath seriesPath = node.getSeriesPath();
+ boolean ascending = node.getScanOrder() == Ordering.ASC;
+ OperatorContext operatorContext =
+ context
+ .getInstanceContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ SeriesAggregationScanOperator.class.getSimpleName());
+
+ List<AggregationDescriptor> aggregationDescriptors =
node.getAggregationDescriptorList();
+ List<Aggregator> aggregators = new ArrayList<>();
+ aggregationDescriptors.forEach(
+ o ->
+ aggregators.add(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ o.getAggregationType(),
node.getSeriesPath().getSeriesType(), ascending),
+ o.getStep())));
+
+ GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+ ITimeRangeIterator timeRangeIterator =
+ initTimeRangeIterator(groupByTimeParameter, ascending, true);
+ long maxReturnSize =
+ AggregationUtil.calculateMaxAggregationResultSize(
+ node.getAggregationDescriptorList(),
+ timeRangeIterator,
+ groupByTimeParameter != null,
+ context.getTypeProvider());
+
+ SeriesAggregationScanOperator aggregateScanOperator =
+ new SeriesAggregationScanOperator(
+ node.getPlanNodeId(),
+ seriesPath,
+ context.getAllSensors(seriesPath.getDevice(),
seriesPath.getMeasurement()),
+ operatorContext,
+ aggregators,
+ timeRangeIterator,
+ node.getTimeFilter(),
+ ascending,
+ node.getGroupByTimeParameter(),
+ maxReturnSize);
+
+ context.addSourceOperator(aggregateScanOperator);
+ context.addPath(seriesPath);
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
aggregators.size());
+ return aggregateScanOperator;
+ }
+
@Override
public Operator visitAlignedSeriesAggregationScan(
AlignedSeriesAggregationScanNode node, LocalExecutionPlanContext
context) {
@@ -307,15 +365,27 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
Collections.singletonList(new InputLocation[] {new
InputLocation(0, seriesIndex)})));
}
+ GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+ ITimeRangeIterator timeRangeIterator =
+ initTimeRangeIterator(groupByTimeParameter, ascending, true);
+ long maxReturnSize =
+ AggregationUtil.calculateMaxAggregationResultSize(
+ node.getAggregationDescriptorList(),
+ timeRangeIterator,
+ groupByTimeParameter != null,
+ context.getTypeProvider());
+
AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
new AlignedSeriesAggregationScanOperator(
node.getPlanNodeId(),
seriesPath,
operatorContext,
aggregators,
+ timeRangeIterator,
node.getTimeFilter(),
ascending,
- node.getGroupByTimeParameter());
+ groupByTimeParameter,
+ maxReturnSize);
context.addSourceOperator(seriesAggregationScanOperator);
context.addPath(seriesPath);
@@ -555,47 +625,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
return new NodePathsCountOperator(operatorContext, child);
}
- @Override
- public Operator visitSeriesAggregationScan(
- SeriesAggregationScanNode node, LocalExecutionPlanContext context) {
- PartialPath seriesPath = node.getSeriesPath();
- boolean ascending = node.getScanOrder() == Ordering.ASC;
- OperatorContext operatorContext =
- context
- .getInstanceContext()
- .addOperatorContext(
- context.getNextOperatorId(),
- node.getPlanNodeId(),
- SeriesAggregationScanOperator.class.getSimpleName());
-
- List<Aggregator> aggregators = new ArrayList<>();
- node.getAggregationDescriptorList()
- .forEach(
- o ->
- aggregators.add(
- new Aggregator(
- AccumulatorFactory.createAccumulator(
- o.getAggregationType(),
- node.getSeriesPath().getSeriesType(),
- ascending),
- o.getStep())));
- SeriesAggregationScanOperator aggregateScanOperator =
- new SeriesAggregationScanOperator(
- node.getPlanNodeId(),
- seriesPath,
- context.getAllSensors(seriesPath.getDevice(),
seriesPath.getMeasurement()),
- operatorContext,
- aggregators,
- node.getTimeFilter(),
- ascending,
- node.getGroupByTimeParameter());
-
- context.addSourceOperator(aggregateScanOperator);
- context.addPath(seriesPath);
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
aggregators.size());
- return aggregateScanOperator;
- }
-
@Override
public Operator visitDeviceView(DeviceViewNode node,
LocalExecutionPlanContext context) {
OperatorContext operatorContext =
@@ -1016,7 +1045,8 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
boolean ascending = node.getScanOrder() == Ordering.ASC;
List<Aggregator> aggregators = new ArrayList<>();
Map<String, List<InputLocation>> layout = makeLayout(node);
- for (GroupByLevelDescriptor descriptor :
node.getGroupByLevelDescriptors()) {
+ List<GroupByLevelDescriptor> aggregationDescriptors =
node.getGroupByLevelDescriptors();
+ for (GroupByLevelDescriptor descriptor : aggregationDescriptors) {
List<InputLocation[]> inputLocationList =
calcInputLocationList(descriptor, layout);
TSDataType seriesDataType =
context
@@ -1038,9 +1068,19 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
node.getPlanNodeId(),
AggregationOperator.class.getSimpleName());
+ GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+ ITimeRangeIterator timeRangeIterator =
+ initTimeRangeIterator(groupByTimeParameter, ascending, false);
+ long maxReturnSize =
+ calculateMaxAggregationResultSize(
+ aggregationDescriptors,
+ timeRangeIterator,
+ groupByTimeParameter != null,
+ context.getTypeProvider());
+
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
aggregators.size());
return new AggregationOperator(
- operatorContext, aggregators, children, ascending,
node.getGroupByTimeParameter(), false);
+ operatorContext, aggregators, timeRangeIterator, children,
maxReturnSize);
}
@Override
@@ -1060,7 +1100,8 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
boolean ascending = node.getScanOrder() == Ordering.ASC;
List<Aggregator> aggregators = new ArrayList<>();
Map<String, List<InputLocation>> layout = makeLayout(node);
- for (AggregationDescriptor descriptor :
node.getAggregationDescriptorList()) {
+ List<AggregationDescriptor> aggregationDescriptors =
node.getAggregationDescriptorList();
+ for (AggregationDescriptor descriptor : aggregationDescriptors) {
List<InputLocation[]> inputLocationList =
calcInputLocationList(descriptor, layout);
aggregators.add(
SlidingWindowAggregatorFactory.createSlidingWindowAggregator(
@@ -1074,9 +1115,25 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
descriptor.getStep()));
}
+ GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+ ITimeRangeIterator timeRangeIterator =
+ initTimeRangeIterator(groupByTimeParameter, ascending, false);
+ long maxReturnSize =
+ calculateMaxAggregationResultSize(
+ aggregationDescriptors,
+ timeRangeIterator,
+ groupByTimeParameter != null,
+ context.getTypeProvider());
+
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
aggregators.size());
return new SlidingWindowAggregationOperator(
- operatorContext, aggregators, child, ascending,
node.getGroupByTimeParameter());
+ operatorContext,
+ aggregators,
+ timeRangeIterator,
+ child,
+ ascending,
+ groupByTimeParameter,
+ maxReturnSize);
}
@Override
@@ -1121,6 +1178,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
boolean ascending = node.getScanOrder() == Ordering.ASC;
List<Aggregator> aggregators = new ArrayList<>();
Map<String, List<InputLocation>> layout = makeLayout(node);
+ List<AggregationDescriptor> aggregationDescriptors =
node.getAggregationDescriptorList();
for (AggregationDescriptor descriptor :
node.getAggregationDescriptorList()) {
List<InputLocation[]> inputLocationList =
calcInputLocationList(descriptor, layout);
aggregators.add(
@@ -1136,6 +1194,8 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
inputLocationList));
}
boolean inputRaw =
node.getAggregationDescriptorList().get(0).getStep().isInputRaw();
+ GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+
if (inputRaw) {
checkArgument(children.size() == 1, "rawDataAggregateOperator can only
accept one input");
OperatorContext operatorContext =
@@ -1146,8 +1206,23 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
node.getPlanNodeId(),
RawDataAggregationOperator.class.getSimpleName());
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
aggregators.size());
+
+ ITimeRangeIterator timeRangeIterator =
+ initTimeRangeIterator(groupByTimeParameter, ascending, true);
+ long maxReturnSize =
+ calculateMaxAggregationResultSize(
+ aggregationDescriptors,
+ timeRangeIterator,
+ groupByTimeParameter != null,
+ context.getTypeProvider());
+
return new RawDataAggregationOperator(
- operatorContext, aggregators, children.get(0), ascending,
node.getGroupByTimeParameter());
+ operatorContext,
+ aggregators,
+ timeRangeIterator,
+ children.get(0),
+ ascending,
+ maxReturnSize);
} else {
OperatorContext operatorContext =
context
@@ -1156,9 +1231,19 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
AggregationOperator.class.getSimpleName());
+
+ ITimeRangeIterator timeRangeIterator =
+ initTimeRangeIterator(groupByTimeParameter, ascending, true);
+ long maxReturnSize =
+ calculateMaxAggregationResultSize(
+ aggregationDescriptors,
+ timeRangeIterator,
+ groupByTimeParameter != null,
+ context.getTypeProvider());
+
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
aggregators.size());
return new AggregationOperator(
- operatorContext, aggregators, children, ascending,
node.getGroupByTimeParameter(), true);
+ operatorContext, aggregators, timeRangeIterator, children,
maxReturnSize);
}
}
@@ -1400,6 +1485,10 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
// last_time, last_value
List<Aggregator> aggregators =
LastQueryUtil.createAggregators(seriesPath.getSeriesType());
+ ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(null, false,
false);
+ long maxReturnSize =
+ calculateMaxAggregationResultSizeForLastQuery(
+ aggregators, seriesPath.transformToPartialPath());
SeriesAggregationScanOperator seriesAggregationScanOperator =
new SeriesAggregationScanOperator(
@@ -1408,9 +1497,11 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getAllSensors(seriesPath.getDevice(),
seriesPath.getMeasurement()),
operatorContext,
aggregators,
+ timeRangeIterator,
context.getLastQueryTimeFilter(),
false,
- null);
+ null,
+ maxReturnSize);
context.addSourceOperator(seriesAggregationScanOperator);
context.addPath(seriesPath);
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
aggregators.size());
@@ -1479,15 +1570,22 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
// last_time, last_value
List<Aggregator> aggregators =
LastQueryUtil.createAggregators(seriesPath.getSchemaList().get(0).getType());
+ ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(null, false,
false);
+ long maxReturnSize =
+ calculateMaxAggregationResultSizeForLastQuery(
+ aggregators, seriesPath.transformToPartialPath());
+
AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
new AlignedSeriesAggregationScanOperator(
node.getPlanNodeId(),
seriesPath,
operatorContext,
aggregators,
+ timeRangeIterator,
context.getLastQueryTimeFilter(),
false,
- null);
+ null,
+ maxReturnSize);
context.addSourceOperator(seriesAggregationScanOperator);
context.addPath(seriesPath);
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
aggregators.size());
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
new file mode 100644
index 0000000000..44d5fc1c66
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.statistics;
+
+import org.apache.iotdb.commons.path.PartialPath;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+public class StatisticsManager {
+
+ private final Map<PartialPath, TimeseriesStats> seriesToStatsMap =
Maps.newConcurrentMap();
+
+ public long getMaxBinarySizeInBytes(PartialPath path) {
+ return 512 * Byte.BYTES;
+ }
+
+ public static StatisticsManager getInstance() {
+ return StatisticsManager.StatisticsManagerHelper.INSTANCE;
+ }
+
+ private static class StatisticsManagerHelper {
+
+ private static final StatisticsManager INSTANCE = new StatisticsManager();
+
+ private StatisticsManagerHelper() {}
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
new file mode 100644
index 0000000000..509341d3d5
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.statistics;
+
+public class TimeseriesStats {
+ // TODO collect time series statistics
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java
index 6233fa04ce..e25f592aa8 100644
---
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java
+++
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java
@@ -166,6 +166,11 @@ public class TimeSelector {
return smallerChildIndex;
}
+ public void clear() {
+ heapSize = 0;
+ lastTime = Long.MIN_VALUE;
+ }
+
@Override
public String toString() {
return Arrays.toString(this.timeHeap);
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
index 88e4b6bf16..1d3f3ccf79 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
@@ -295,6 +295,8 @@ public class TimeRangeIteratorTest {
}
private void checkRes(ITimeRangeIterator timeRangeIterator, String[] res) {
+ Assert.assertEquals(res.length, timeRangeIterator.getTotalIntervalNum());
+
boolean isAscending = timeRangeIterator.isAscending();
int cnt = isAscending ? 0 : res.length - 1;
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
index 233965aa3e..85e65849a6 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
@@ -61,6 +61,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
public class AggregationOperatorTest {
@@ -319,9 +321,11 @@ public class AggregationOperatorTest {
Collections.singleton("sensor0"),
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, true, true),
null,
true,
- groupByTimeParameter);
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
List<TsFileResource> seqResources1 = new ArrayList<>();
List<TsFileResource> unSeqResources1 = new ArrayList<>();
seqResources1.add(seqResources.get(0));
@@ -341,9 +345,11 @@ public class AggregationOperatorTest {
Collections.singleton("sensor0"),
fragmentInstanceContext.getOperatorContexts().get(1),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, true, true),
null,
true,
- groupByTimeParameter);
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
List<TsFileResource> seqResources2 = new ArrayList<>();
List<TsFileResource> unSeqResources2 = new ArrayList<>();
seqResources2.add(seqResources.get(2));
@@ -368,9 +374,8 @@ public class AggregationOperatorTest {
return new AggregationOperator(
fragmentInstanceContext.getOperatorContexts().get(2),
finalAggregators,
+ initTimeRangeIterator(groupByTimeParameter, true, true),
children,
- true,
- groupByTimeParameter,
- true);
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
index 2277299102..a2e989f47e 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
@@ -63,6 +63,8 @@ import java.util.stream.Collectors;
import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -621,10 +623,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
1, planNodeId, SeriesScanOperator.class.getSimpleName());
fragmentInstanceContext
.getOperatorContexts()
- .forEach(
- operatorContext -> {
- operatorContext.setMaxRunTime(TEST_TIME_SLICE);
- });
+ .forEach(operatorContext ->
operatorContext.setMaxRunTime(TEST_TIME_SLICE));
AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
new AlignedSeriesAggregationScanOperator(
@@ -632,9 +631,11 @@ public class AlignedSeriesAggregationScanOperatorTest {
alignedPath,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, ascending, true),
timeFilter,
ascending,
- groupByTimeParameter);
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
return seriesAggregationScanOperator;
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
index d914ec3ee7..4864fed39b 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
@@ -56,6 +56,8 @@ import java.util.concurrent.ExecutorService;
import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -133,9 +135,11 @@ public class LastQueryOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators1,
+ initTimeRangeIterator(null, false, true),
null,
false,
- null);
+ null,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator1.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
@@ -155,9 +159,11 @@ public class LastQueryOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(2),
aggregators2,
+ initTimeRangeIterator(null, false, true),
null,
false,
- null);
+ null,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator2.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
@@ -252,9 +258,11 @@ public class LastQueryOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators1,
+ initTimeRangeIterator(null, false, true),
null,
false,
- null);
+ null,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator1.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
@@ -274,9 +282,11 @@ public class LastQueryOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(2),
aggregators2,
+ initTimeRangeIterator(null, false, true),
null,
false,
- null);
+ null,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator2.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
index 2785f1d803..6dc84c071b 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
@@ -58,6 +58,8 @@ import java.util.concurrent.ExecutorService;
import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -135,9 +137,11 @@ public class LastQuerySortOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators1,
+ initTimeRangeIterator(null, false, true),
null,
false,
- null);
+ null,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator1.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
@@ -157,9 +161,11 @@ public class LastQuerySortOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(2),
aggregators2,
+ initTimeRangeIterator(null, false, true),
null,
false,
- null);
+ null,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator2.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
@@ -255,9 +261,11 @@ public class LastQuerySortOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators1,
+ initTimeRangeIterator(null, false, true),
null,
false,
- null);
+ null,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator1.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
@@ -277,9 +285,11 @@ public class LastQuerySortOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(2),
aggregators2,
+ initTimeRangeIterator(null, false, true),
null,
false,
- null);
+ null,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator2.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index d0598e6494..085f9829c6 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -22,11 +22,15 @@ import
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
@@ -34,6 +38,8 @@ import
org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperat
import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.SortOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
import
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
@@ -63,20 +69,29 @@ import
org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesSchemaScanOpe
import
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
import
org.apache.iotdb.db.mpp.transformation.dag.column.binary.ArithmeticAdditionColumnTransformer;
import
org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareLessEqualColumnTransformer;
import
org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer;
import
org.apache.iotdb.db.mpp.transformation.dag.column.leaf.TimeColumnTransformer;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import org.apache.iotdb.tsfile.read.common.type.BooleanType;
import org.apache.iotdb.tsfile.read.common.type.LongType;
import org.apache.iotdb.tsfile.read.common.type.TypeEnum;
@@ -95,6 +110,7 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static
org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator.MAP_NODE_RETRAINED_SIZE;
import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
@@ -1073,4 +1089,448 @@ public class OperatorMemoryTest {
assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
assertEquals(expectedRetainedSize,
operator.calculateRetainedSizeAfterCallingNext());
}
+
+ @Test
+ public void seriesAggregationScanOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1",
TSDataType.TEXT);
+ TypeProvider typeProvider = new TypeProvider();
+ typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+ typeProvider.setType("min_time(root.sg.d1.s1)", TSDataType.INT64);
+ typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+ // case1: without group by, step is SINGLE
+ List<AggregationDescriptor> aggregationDescriptors1 =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))));
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator1 =
+ createSeriesAggregationScanOperator(
+ instanceNotificationExecutor,
+ measurementPath,
+ aggregationDescriptors1,
+ null,
+ typeProvider);
+
+ long expectedMaxReturnSize = 512 * Byte.BYTES +
LongColumn.SIZE_IN_BYTES_PER_POSITION;
+ long expectedMaxRetainSize =
+ 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+ assertEquals(
+ expectedMaxReturnSize + expectedMaxRetainSize,
+ seriesAggregationScanOperator1.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize,
seriesAggregationScanOperator1.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize,
+
seriesAggregationScanOperator1.calculateRetainedSizeAfterCallingNext());
+
+ // case2: without group by, step is PARTIAL
+ List<AggregationDescriptor> aggregationDescriptors2 =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.PARTIAL,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.PARTIAL,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))));
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator2 =
+ createSeriesAggregationScanOperator(
+ instanceNotificationExecutor,
+ measurementPath,
+ aggregationDescriptors2,
+ null,
+ typeProvider);
+
+ expectedMaxReturnSize = 512 * Byte.BYTES + 2 *
LongColumn.SIZE_IN_BYTES_PER_POSITION;
+ expectedMaxRetainSize = 2L *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+ assertEquals(
+ expectedMaxReturnSize + expectedMaxRetainSize,
+ seriesAggregationScanOperator2.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize,
seriesAggregationScanOperator2.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize,
+
seriesAggregationScanOperator2.calculateRetainedSizeAfterCallingNext());
+
+ long maxTsBlockLineNumber =
+ TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+
+ // case3: with group by, total window num < 1000
+ GroupByTimeParameter groupByTimeParameter =
+ new GroupByTimeParameter(
+ 0,
+ 2 * maxTsBlockLineNumber,
+ maxTsBlockLineNumber / 100,
+ maxTsBlockLineNumber / 100,
+ true);
+ List<AggregationDescriptor> aggregationDescriptors3 =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))));
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator3 =
+ createSeriesAggregationScanOperator(
+ instanceNotificationExecutor,
+ measurementPath,
+ aggregationDescriptors3,
+ groupByTimeParameter,
+ typeProvider);
+
+ expectedMaxReturnSize =
+ 200
+ * (TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ + 512 * Byte.BYTES
+ + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+ expectedMaxRetainSize = 2L *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+ assertEquals(
+ expectedMaxReturnSize + expectedMaxRetainSize,
+ seriesAggregationScanOperator3.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize,
seriesAggregationScanOperator3.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize,
+
seriesAggregationScanOperator3.calculateRetainedSizeAfterCallingNext());
+
+ // case4: with group by, total window num > 1000
+ groupByTimeParameter = new GroupByTimeParameter(0, 2 *
maxTsBlockLineNumber, 1, 1, true);
+ List<AggregationDescriptor> aggregationDescriptors4 =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))));
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator4 =
+ createSeriesAggregationScanOperator(
+ instanceNotificationExecutor,
+ measurementPath,
+ aggregationDescriptors4,
+ groupByTimeParameter,
+ typeProvider);
+
+ expectedMaxReturnSize =
+ maxTsBlockLineNumber
+ * (TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ + 512 * Byte.BYTES
+ + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+ expectedMaxRetainSize = 2L *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+ assertEquals(
+ expectedMaxReturnSize + expectedMaxRetainSize,
+ seriesAggregationScanOperator4.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize,
seriesAggregationScanOperator4.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize,
+
seriesAggregationScanOperator4.calculateRetainedSizeAfterCallingNext());
+
+ // case5: over DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES
+ groupByTimeParameter = new GroupByTimeParameter(0, 2 *
maxTsBlockLineNumber, 1, 1, true);
+ List<AggregationDescriptor> aggregationDescriptors5 =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))));
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator5 =
+ createSeriesAggregationScanOperator(
+ instanceNotificationExecutor,
+ measurementPath,
+ aggregationDescriptors5,
+ groupByTimeParameter,
+ typeProvider);
+
+ expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ expectedMaxRetainSize = 2L *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+ assertEquals(
+ expectedMaxReturnSize + expectedMaxRetainSize,
+ seriesAggregationScanOperator5.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize,
seriesAggregationScanOperator5.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize,
+
seriesAggregationScanOperator5.calculateRetainedSizeAfterCallingNext());
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ private SeriesAggregationScanOperator createSeriesAggregationScanOperator(
+ ExecutorService instanceNotificationExecutor,
+ MeasurementPath measurementPath,
+ List<AggregationDescriptor> aggregationDescriptors,
+ GroupByTimeParameter groupByTimeParameter,
+ TypeProvider typeProvider)
+ throws IllegalPathException {
+ Set<String> allSensors = Sets.newHashSet("s1");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+ List<Aggregator> aggregators = new ArrayList<>();
+ aggregationDescriptors.forEach(
+ o ->
+ aggregators.add(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ o.getAggregationType(),
measurementPath.getSeriesType(), true),
+ o.getStep())));
+
+ ITimeRangeIterator timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, true, true);
+ long maxReturnSize =
+ AggregationUtil.calculateMaxAggregationResultSize(
+ aggregationDescriptors, timeRangeIterator, groupByTimeParameter !=
null, typeProvider);
+
+ return new SeriesAggregationScanOperator(
+ planNodeId,
+ measurementPath,
+ allSensors,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ aggregators,
+ timeRangeIterator,
+ null,
+ true,
+ groupByTimeParameter,
+ maxReturnSize);
+ }
+
+ @Test
+ public void rawDataAggregationOperatorTest() throws IllegalPathException {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+ MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1",
TSDataType.TEXT);
+ TypeProvider typeProvider = new TypeProvider();
+ typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+ typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+ List<AggregationDescriptor> aggregationDescriptors =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))));
+
+ List<Aggregator> aggregators = new ArrayList<>();
+ aggregationDescriptors.forEach(
+ o ->
+ aggregators.add(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ o.getAggregationType(),
measurementPath.getSeriesType(), true),
+ o.getStep())));
+
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0,
1000, 10, 10, true);
+ ITimeRangeIterator timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, true, false);
+ long maxReturnSize =
+ AggregationUtil.calculateMaxAggregationResultSize(
+ aggregationDescriptors, timeRangeIterator, true, typeProvider);
+
+ RawDataAggregationOperator rawDataAggregationOperator =
+ new RawDataAggregationOperator(
+ Mockito.mock(OperatorContext.class),
+ aggregators,
+ timeRangeIterator,
+ child,
+ true,
+ maxReturnSize);
+
+ long expectedMaxReturnSize =
+ 100
+ * (512 * Byte.BYTES
+ + TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+ long expectedMaxRetainSize = child.calculateMaxReturnSize();
+
+ assertEquals(
+ expectedMaxReturnSize
+ + expectedMaxRetainSize
+ + child.calculateRetainedSizeAfterCallingNext(),
+ rawDataAggregationOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize,
rawDataAggregationOperator.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize + child.calculateRetainedSizeAfterCallingNext(),
+ rawDataAggregationOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void slidingWindowAggregationOperatorTest() throws
IllegalPathException {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+ MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1",
TSDataType.TEXT);
+ TypeProvider typeProvider = new TypeProvider();
+ typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+ typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+ List<AggregationDescriptor> aggregationDescriptors =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))));
+
+ List<Aggregator> aggregators = new ArrayList<>();
+ aggregationDescriptors.forEach(
+ o ->
+ aggregators.add(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ o.getAggregationType(),
measurementPath.getSeriesType(), true),
+ o.getStep())));
+
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0,
1000, 10, 5, true);
+ ITimeRangeIterator timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, true, false);
+ long maxReturnSize =
+ AggregationUtil.calculateMaxAggregationResultSize(
+ aggregationDescriptors, timeRangeIterator, true, typeProvider);
+
+ SlidingWindowAggregationOperator slidingWindowAggregationOperator =
+ new SlidingWindowAggregationOperator(
+ Mockito.mock(OperatorContext.class),
+ aggregators,
+ timeRangeIterator,
+ child,
+ true,
+ groupByTimeParameter,
+ maxReturnSize);
+
+ long expectedMaxReturnSize =
+ 200
+ * (512 * Byte.BYTES
+ + TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+ long expectedMaxRetainSize = child.calculateMaxReturnSize();
+
+ assertEquals(
+ expectedMaxReturnSize
+ + expectedMaxRetainSize
+ + child.calculateRetainedSizeAfterCallingNext(),
+ slidingWindowAggregationOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize,
slidingWindowAggregationOperator.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize + child.calculateRetainedSizeAfterCallingNext(),
+
slidingWindowAggregationOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void aggregationOperatorTest() throws IllegalPathException {
+ List<Operator> children = new ArrayList<>(4);
+ long expectedChildrenRetainedSize = 0L;
+ long expectedMaxRetainSize = 0L;
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(64 *
1024L);
+ expectedChildrenRetainedSize += 64 * 1024L;
+ expectedMaxRetainSize += 64 * 1024L;
+ children.add(child);
+ }
+
+ MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1",
TSDataType.TEXT);
+ TypeProvider typeProvider = new TypeProvider();
+ typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+ typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+ List<AggregationDescriptor> aggregationDescriptors =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new
TimeSeriesOperand(measurementPath))));
+
+ List<Aggregator> aggregators = new ArrayList<>();
+ aggregationDescriptors.forEach(
+ o ->
+ aggregators.add(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ o.getAggregationType(),
measurementPath.getSeriesType(), true),
+ o.getStep())));
+
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0,
1000, 10, 10, true);
+ ITimeRangeIterator timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, true, false);
+ long maxReturnSize =
+ AggregationUtil.calculateMaxAggregationResultSize(
+ aggregationDescriptors, timeRangeIterator, true, typeProvider);
+
+ AggregationOperator aggregationOperator =
+ new AggregationOperator(
+ Mockito.mock(OperatorContext.class),
+ aggregators,
+ timeRangeIterator,
+ children,
+ maxReturnSize);
+
+ long expectedMaxReturnSize =
+ 100
+ * (512 * Byte.BYTES
+ + TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+
+ assertEquals(
+ expectedMaxReturnSize + expectedMaxRetainSize +
expectedChildrenRetainedSize,
+ aggregationOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize,
aggregationOperator.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize + expectedChildrenRetainedSize,
+ aggregationOperator.calculateRetainedSizeAfterCallingNext());
+ }
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
index 6a9902f24b..f25b0d2fcd 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
@@ -64,6 +64,8 @@ import java.util.concurrent.ExecutorService;
import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
public class RawDataAggregationOperatorTest {
@@ -381,8 +383,9 @@ public class RawDataAggregationOperatorTest {
return new RawDataAggregationOperator(
fragmentInstanceContext.getOperatorContexts().get(3),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, true, true),
timeJoinOperator,
true,
- groupByTimeParameter);
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
index 71fd167404..f3b37b83f7 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
@@ -59,6 +59,8 @@ import java.util.concurrent.ExecutorService;
import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
public class SeriesAggregationScanOperatorTest {
@@ -515,9 +517,11 @@ public class SeriesAggregationScanOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, ascending, true),
timeFilter,
ascending,
- groupByTimeParameter);
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
return seriesAggregationScanOperator;
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
index 81ab99e1ff..a0ed242e63 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
@@ -61,6 +61,8 @@ import java.util.stream.Collectors;
import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
public class SlidingWindowAggregationOperatorTest {
@@ -232,9 +234,11 @@ public class SlidingWindowAggregationOperatorTest {
Collections.singleton("sensor0"),
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, ascending, true),
null,
ascending,
- groupByTimeParameter);
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
@@ -254,8 +258,10 @@ public class SlidingWindowAggregationOperatorTest {
return new SlidingWindowAggregationOperator(
fragmentInstanceContext.getOperatorContexts().get(1),
finalAggregators,
+ initTimeRangeIterator(groupByTimeParameter, ascending, false),
seriesAggregationScanOperator,
ascending,
- groupByTimeParameter);
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
index 34b9b8fa87..bff57278b1 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
@@ -56,6 +56,8 @@ import java.util.concurrent.ExecutorService;
import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -211,9 +213,11 @@ public class UpdateLastCacheOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, ascending, true),
timeFilter,
ascending,
- groupByTimeParameter);
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));