This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5e368d4eef [IOTDB-3081] Implementation of
SlidingWindowAggregationOperator (#5986)
5e368d4eef is described below
commit 5e368d4eef6925c616821bd2543092079c676ba4
Author: liuminghui233 <[email protected]>
AuthorDate: Thu May 26 19:34:41 2022 +0800
[IOTDB-3081] Implementation of SlidingWindowAggregationOperator (#5986)
---
.../iotdb/db/mpp/aggregation/Aggregator.java | 22 +-
.../iotdb/db/mpp/aggregation/AvgAccumulator.java | 6 +
.../db/mpp/aggregation/MinTimeDescAccumulator.java | 2 +-
.../EmptyQueueSlidingWindowAggregator.java | 58 +++++
.../MonotonicQueueSlidingWindowAggregator.java | 76 +++++++
.../NormalQueueSlidingWindowAggregator.java | 61 ++++++
.../slidingwindow/SlidingWindowAggregator.java | 143 ++++++++++++
.../SlidingWindowAggregatorFactory.java | 149 +++++++++++++
.../SmoothQueueSlidingWindowAggregator.java | 57 +++++
.../operator/process/AggregationOperator.java | 4 +-
.../process/RawDataAggregationOperator.java | 6 +-
.../process/SlidingWindowAggregationOperator.java | 144 ++++++++++++
.../AlignedSeriesAggregationScanOperator.java | 12 +-
.../source/SeriesAggregationScanOperator.java | 16 +-
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 75 +++++--
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 73 +++---
.../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 10 +-
.../mpp/plan/planner/plan/node/PlanNodeType.java | 6 +-
.../db/mpp/plan/planner/plan/node/PlanVisitor.java | 4 +-
.../planner/plan/node/process/AggregationNode.java | 46 ++--
...Node.java => SlidingWindowAggregationNode.java} | 67 +++---
.../apache/iotdb/db/utils/ErrorHandlingUtils.java | 3 +
.../apache/iotdb/db/utils/TypeInferenceUtils.java | 25 +++
.../operator/AggregationOperatorTest.java | 2 +-
.../SlidingWindowAggregationOperatorTest.java | 244 +++++++++++++++++++++
.../db/mpp/plan/plan/QueryLogicalPlanUtil.java | 12 +-
.../node/process/AggregationNodeSerdeTest.java | 3 +-
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
28 files changed, 1183 insertions(+), 144 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
index b9ccc4c7b3..d99d71a153 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -34,12 +34,12 @@ import static
com.google.common.base.Preconditions.checkArgument;
public class Aggregator {
- private final Accumulator accumulator;
+ protected final Accumulator accumulator;
// In some intermediate result input, inputLocation[] should include two
columns
- private List<InputLocation[]> inputLocationList;
- private final AggregationStep step;
+ protected List<InputLocation[]> inputLocationList;
+ protected final AggregationStep step;
- private TimeRange timeRange = new TimeRange(0, Long.MAX_VALUE);
+ protected TimeRange curTimeRange = new TimeRange(0, Long.MAX_VALUE);
// Used for SeriesAggregateScanOperator
public Aggregator(Accumulator accumulator, AggregationStep step) {
@@ -61,7 +61,7 @@ public class Aggregator {
step.isInputRaw(),
"Step in SeriesAggregateScanOperator and RawDataAggregateOperator can
only process raw input");
if (inputLocationList == null) {
- accumulator.addInput(tsBlock.getTimeAndValueColumn(0), timeRange);
+ accumulator.addInput(tsBlock.getTimeAndValueColumn(0), curTimeRange);
} else {
for (InputLocation[] inputLocations : inputLocationList) {
checkArgument(
@@ -70,7 +70,7 @@ public class Aggregator {
Column[] timeValueColumn = new Column[2];
timeValueColumn[0] = tsBlock.getTimeColumn();
timeValueColumn[1] =
tsBlock.getColumn(inputLocations[0].getValueColumnIndex());
- accumulator.addInput(timeValueColumn, timeRange);
+ accumulator.addInput(timeValueColumn, curTimeRange);
}
}
}
@@ -129,7 +129,7 @@ public class Aggregator {
}
public void reset() {
- timeRange = new TimeRange(0, Long.MAX_VALUE);
+ curTimeRange = new TimeRange(0, Long.MAX_VALUE);
accumulator.reset();
}
@@ -137,11 +137,11 @@ public class Aggregator {
return accumulator.hasFinalResult();
}
- public void setTimeRange(TimeRange timeRange) {
- this.timeRange = timeRange;
+ public void updateTimeRange(TimeRange curTimeRange) {
+ this.curTimeRange = curTimeRange;
}
- public TimeRange getTimeRange() {
- return timeRange;
+ public TimeRange getCurTimeRange() {
+ return curTimeRange;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
index 09dcf14628..4f1e9cd8a6 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
@@ -74,6 +74,9 @@ public class AvgAccumulator implements Accumulator {
initResult = true;
countValue += partialResult[0].getLong(0);
sumValue += partialResult[1].getDouble(0);
+ if (countValue == 0) {
+ initResult = false;
+ }
}
@Override
@@ -85,6 +88,9 @@ public class AvgAccumulator implements Accumulator {
} else {
sumValue += statistics.getSumDoubleValue();
}
+ if (countValue == 0) {
+ initResult = false;
+ }
}
// Set sumValue to finalResult and keep countValue equals to 1
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
index 615a05686c..8fe815eb7c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
@@ -28,7 +28,7 @@ public class MinTimeDescAccumulator extends
MinTimeAccumulator {
public void addInput(Column[] column, TimeRange timeRange) {
for (int i = 0; i < column[0].getPositionCount(); i++) {
long curTime = column[0].getLong(i);
- if (timeRange.contains(curTime)) {
+ if (timeRange.contains(curTime) && !column[1].isNull(i)) {
updateMinTime(curTime);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/EmptyQueueSlidingWindowAggregator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/EmptyQueueSlidingWindowAggregator.java
new file mode 100644
index 0000000000..52b009e922
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/EmptyQueueSlidingWindowAggregator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation.slidingwindow;
+
+import org.apache.iotdb.db.mpp.aggregation.Accumulator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+
+import java.util.List;
+
+/**
+ * When calculating MAX_TIME and LAST_VALUE (the MIN_TIME and FIRST_VALUE in
descending order), the
+ * aggregation result always appears in the most recent pre-aggregation
result. So, we do not need
+ * to cache the previous pre-aggregated results in the queue.
+ */
+public class EmptyQueueSlidingWindowAggregator extends SlidingWindowAggregator
{
+
+ private long lastTime;
+
+ public EmptyQueueSlidingWindowAggregator(
+ Accumulator accumulator, List<InputLocation[]> inputLocationList,
AggregationStep step) {
+ super(accumulator, inputLocationList, step);
+ }
+
+ @Override
+ protected void evictingExpiredValue() {
+ if (lastTime != -1 && !curTimeRange.contains(lastTime)) {
+ this.accumulator.reset();
+ lastTime = -1;
+ }
+ }
+
+ @Override
+ public void processPartialResult(PartialAggregationResult partialResult) {
+ if (!partialResult.isNull()) {
+ this.accumulator.reset();
+ this.accumulator.addIntermediate(partialResult.getPartialResult());
+ lastTime = partialResult.getTime();
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/MonotonicQueueSlidingWindowAggregator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/MonotonicQueueSlidingWindowAggregator.java
new file mode 100644
index 0000000000..7706edfb2c
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/MonotonicQueueSlidingWindowAggregator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation.slidingwindow;
+
+import org.apache.iotdb.db.mpp.aggregation.Accumulator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * When computing MAX_VALUE, MIN_VALUE, EXTREME, we only add partial
aggregation results that
+ * maintain monotonicity to queue. The aggregation result always appears at
the head of the queue.
+ */
+public class MonotonicQueueSlidingWindowAggregator extends
SlidingWindowAggregator {
+
+ private final Comparator<Column> comparator;
+
+ public MonotonicQueueSlidingWindowAggregator(
+ Accumulator accumulator,
+ List<InputLocation[]> inputLocationList,
+ AggregationStep step,
+ Comparator<Column> comparator) {
+ super(accumulator, inputLocationList, step);
+ this.comparator = comparator;
+ }
+
+ @Override
+ protected void evictingExpiredValue() {
+ while (!deque.isEmpty() &&
!curTimeRange.contains(deque.getFirst().getTime())) {
+ deque.removeFirst();
+ }
+ this.accumulator.reset();
+ if (!deque.isEmpty()) {
+ this.accumulator.addIntermediate(deque.getFirst().getPartialResult());
+ }
+ }
+
+ @Override
+ public void processPartialResult(PartialAggregationResult partialResult) {
+ if (partialResult.isNull()) {
+ return;
+ }
+
+ while (!deque.isEmpty()
+ && comparator.compare(
+ partialResult.getPartialResult()[0],
deque.getLast().getPartialResult()[0])
+ > 0) {
+ deque.removeLast();
+ }
+ deque.addLast(partialResult);
+ this.accumulator.reset();
+ if (!deque.isEmpty()) {
+ this.accumulator.addIntermediate(deque.getFirst().getPartialResult());
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/NormalQueueSlidingWindowAggregator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/NormalQueueSlidingWindowAggregator.java
new file mode 100644
index 0000000000..eecaa8cffe
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/NormalQueueSlidingWindowAggregator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation.slidingwindow;
+
+import org.apache.iotdb.db.mpp.aggregation.Accumulator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+
+import java.util.List;
+
+/**
+ * When calculating MIN_TIME and FIRST_VALUE (MAX_TIME and LAST_VALUE in
descending order), the
+ * aggregated result always appears at the head of the queue. We need to cache
all pre-aggregated
+ * results in the queue.
+ */
+public class NormalQueueSlidingWindowAggregator extends
SlidingWindowAggregator {
+
+ public NormalQueueSlidingWindowAggregator(
+ Accumulator accumulator, List<InputLocation[]> inputLocationList,
AggregationStep step) {
+ super(accumulator, inputLocationList, step);
+ }
+
+ @Override
+ protected void evictingExpiredValue() {
+ while (!deque.isEmpty() &&
!curTimeRange.contains(deque.getFirst().getTime())) {
+ deque.removeFirst();
+ }
+ this.accumulator.reset();
+ if (!deque.isEmpty()) {
+ this.accumulator.addIntermediate(deque.getFirst().getPartialResult());
+ }
+ }
+
+ @Override
+ public void processPartialResult(PartialAggregationResult partialResult) {
+ if (!partialResult.isNull()) {
+ deque.addLast(partialResult);
+ }
+ this.accumulator.reset();
+ if (!deque.isEmpty()) {
+ this.accumulator.addIntermediate(deque.getFirst().getPartialResult());
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
new file mode 100644
index 0000000000..5ab0d0195e
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation.slidingwindow;
+
+import org.apache.iotdb.db.mpp.aggregation.Accumulator;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public abstract class SlidingWindowAggregator extends Aggregator {
+
+ // cached partial aggregation result of pre-aggregate windows
+ protected Deque<PartialAggregationResult> deque;
+
+ public SlidingWindowAggregator(
+ Accumulator accumulator, List<InputLocation[]> inputLocationList,
AggregationStep step) {
+ super(accumulator, step, inputLocationList);
+ this.deque = new LinkedList<>();
+ }
+
+ @Override
+ public void processTsBlock(TsBlock tsBlock) {
+ checkArgument(
+ step.isInputPartial(),
+ "Step in SlidingWindowAggregationOperator can only process partial
result");
+ TimeColumn timeColumn = tsBlock.getTimeColumn();
+ Column[] valueColumn = new Column[inputLocationList.get(0).length];
+ for (int i = 0; i < inputLocationList.get(0).length; i++) {
+ InputLocation inputLocation = inputLocationList.get(0)[i];
+ checkArgument(
+ inputLocation.getTsBlockIndex() == 0,
+ "SlidingWindowAggregationOperator can only process one tsBlock
input.");
+ valueColumn[i] = tsBlock.getColumn(inputLocation.getValueColumnIndex());
+ }
+ processPartialResult(new PartialAggregationResult(timeColumn,
valueColumn));
+ }
+
+ @Override
+ public void updateTimeRange(TimeRange curTimeRange) {
+ this.curTimeRange = curTimeRange;
+ evictingExpiredValue();
+ }
+
+ /** evicting expired element in queue and reset expired aggregateResult */
+ protected abstract void evictingExpiredValue();
+
+ /** update queue and aggregateResult */
+ public abstract void processPartialResult(PartialAggregationResult
partialResult);
+
+ protected static class PartialAggregationResult {
+
+ private final TimeColumn timeColumn;
+ private final Column[] partialResultColumns;
+
+ public PartialAggregationResult(TimeColumn timeColumn, Column[]
partialResultColumns) {
+ this.timeColumn = timeColumn;
+ this.partialResultColumns = partialResultColumns;
+ }
+
+ public boolean isNull() {
+ return partialResultColumns[0].isNull(0);
+ }
+
+ public long getTime() {
+ return timeColumn.getLong(0);
+ }
+
+ public Column[] getPartialResult() {
+ return partialResultColumns;
+ }
+
+ public List<TSDataType> getDataTypes() {
+ return Arrays.stream(partialResultColumns)
+ .sequential()
+ .map(Column::getDataType)
+ .collect(Collectors.toList());
+ }
+
+ public Column[] opposite() {
+ List<TSDataType> dataTypes = getDataTypes();
+ TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(dataTypes);
+ ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+ Column[] results = new Column[partialResultColumns.length];
+ for (int i = 0; i < partialResultColumns.length; i++) {
+ switch (dataTypes.get(i)) {
+ case INT32:
+ columnBuilders[i].writeInt(partialResultColumns[i].getInt(0) * -1);
+ break;
+ case INT64:
+ columnBuilders[i].writeLong(partialResultColumns[i].getLong(0) *
-1);
+ break;
+ case FLOAT:
+ columnBuilders[i].writeFloat(partialResultColumns[i].getFloat(0) *
-1);
+ break;
+ case DOUBLE:
+ columnBuilders[i].writeDouble(partialResultColumns[i].getDouble(0)
* -1);
+ break;
+ case TEXT:
+ case BOOLEAN:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in opposite : %s",
dataTypes.get(i)));
+ default:
+ throw new IllegalArgumentException("Unknown data type: " +
dataTypes.get(i));
+ }
+ results[i] = columnBuilders[i].build();
+ }
+ return results;
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
new file mode 100644
index 0000000000..a6dc525717
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation.slidingwindow;
+
+import org.apache.iotdb.db.mpp.aggregation.Accumulator;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SlidingWindowAggregatorFactory {
+
+ /** comparators used for MonotonicQueueSlidingWindowAggregator */
+ private static final Map<TSDataType, Comparator<Column>> maxComparators =
new HashMap<>();
+
+ private static final Map<TSDataType, Comparator<Column>> minComparators =
new HashMap<>();
+ private static final Map<TSDataType, Comparator<Column>> extremeComparators
= new HashMap<>();
+
+ static {
+ // return a value greater than 0 if o1 is numerically greater than o2
+ maxComparators.put(TSDataType.INT32, Comparator.comparingInt(o ->
o.getInt(0)));
+ maxComparators.put(TSDataType.INT64, Comparator.comparingLong(o ->
o.getLong(0)));
+ maxComparators.put(TSDataType.FLOAT, Comparator.comparing(o ->
o.getFloat(0)));
+ maxComparators.put(TSDataType.DOUBLE, Comparator.comparingDouble(o ->
o.getDouble(0)));
+
+ // return a value greater than 0 if o1 is numerically less than o2
+ minComparators.put(TSDataType.INT32, (o1, o2) ->
Integer.compare(o2.getInt(0), o1.getInt(0)));
+ minComparators.put(TSDataType.INT64, (o1, o2) ->
Long.compare(o2.getLong(0), o1.getLong(0)));
+ minComparators.put(TSDataType.FLOAT, (o1, o2) ->
Float.compare(o2.getFloat(0), o1.getFloat(0)));
+ minComparators.put(
+ TSDataType.DOUBLE, (o1, o2) -> Double.compare(o2.getDouble(0),
o1.getDouble(0)));
+
+ // return a value greater than 0 if abs(o1) is numerically greater than
abs(o2)
+ // if abs(o1) == abs(o2), return a value greater than 0 if o1 is
numerically greater than o2
+ extremeComparators.put(
+ TSDataType.INT32,
+ (o1, o2) -> {
+ int value1 = o1.getInt(0);
+ int value2 = o2.getInt(0);
+ if (Math.abs(value1) > Math.abs(value2)
+ || (Math.abs(value1) == Math.abs(value2) && value1 > value2)) {
+ return 1;
+ } else if (value1 == value2) {
+ return 0;
+ }
+ return -1;
+ });
+ extremeComparators.put(
+ TSDataType.INT64,
+ (o1, o2) -> {
+ long value1 = o1.getLong(0);
+ long value2 = o2.getLong(0);
+ if (Math.abs(value1) > Math.abs(value2)
+ || (Math.abs(value1) == Math.abs(value2) && value1 > value2)) {
+ return 1;
+ } else if (value1 == value2) {
+ return 0;
+ }
+ return -1;
+ });
+ extremeComparators.put(
+ TSDataType.FLOAT,
+ (o1, o2) -> {
+ float value1 = o1.getFloat(0);
+ float value2 = o2.getFloat(0);
+ if (Math.abs(value1) > Math.abs(value2)
+ || (Math.abs(value1) == Math.abs(value2) && value1 > value2)) {
+ return 1;
+ } else if (value1 == value2) {
+ return 0;
+ }
+ return -1;
+ });
+ extremeComparators.put(
+ TSDataType.DOUBLE,
+ (o1, o2) -> {
+ double value1 = o1.getDouble(0);
+ double value2 = o2.getDouble(0);
+ if (Math.abs(value1) > Math.abs(value2)
+ || (Math.abs(value1) == Math.abs(value2) && value1 > value2)) {
+ return 1;
+ } else if (value1 == value2) {
+ return 0;
+ }
+ return -1;
+ });
+ }
+
+ public static SlidingWindowAggregator createSlidingWindowAggregator(
+ AggregationType aggregationType,
+ TSDataType dataType,
+ boolean ascending,
+ List<InputLocation[]> inputLocationList,
+ AggregationStep step) {
+ Accumulator accumulator =
+ AccumulatorFactory.createAccumulator(aggregationType, dataType,
ascending);
+ switch (aggregationType) {
+ case SUM:
+ case AVG:
+ case COUNT:
+ return new SmoothQueueSlidingWindowAggregator(accumulator,
inputLocationList, step);
+ case MAX_VALUE:
+ return new MonotonicQueueSlidingWindowAggregator(
+ accumulator, inputLocationList, step,
maxComparators.get(dataType));
+ case MIN_VALUE:
+ return new MonotonicQueueSlidingWindowAggregator(
+ accumulator, inputLocationList, step,
minComparators.get(dataType));
+ case EXTREME:
+ return new MonotonicQueueSlidingWindowAggregator(
+ accumulator, inputLocationList, step,
extremeComparators.get(dataType));
+ case MIN_TIME:
+ case FIRST_VALUE:
+ return !ascending
+ ? new EmptyQueueSlidingWindowAggregator(accumulator,
inputLocationList, step)
+ : new NormalQueueSlidingWindowAggregator(accumulator,
inputLocationList, step);
+ case MAX_TIME:
+ case LAST_VALUE:
+ return !ascending
+ ? new NormalQueueSlidingWindowAggregator(accumulator,
inputLocationList, step)
+ : new EmptyQueueSlidingWindowAggregator(accumulator,
inputLocationList, step);
+ default:
+ throw new IllegalArgumentException("Invalid Aggregation Type: " +
aggregationType);
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SmoothQueueSlidingWindowAggregator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SmoothQueueSlidingWindowAggregator.java
new file mode 100644
index 0000000000..0d82e0169d
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SmoothQueueSlidingWindowAggregator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation.slidingwindow;
+
+import org.apache.iotdb.db.mpp.aggregation.Accumulator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+
+import java.util.List;
+
+/**
+ * The aggregation result is calculated from all pre-aggregation results in
the currently maintained
+ * queue when calculating the COUNT, SUM, and AVG.
+ */
+public class SmoothQueueSlidingWindowAggregator extends
SlidingWindowAggregator {
+ public SmoothQueueSlidingWindowAggregator(
+ Accumulator accumulator, List<InputLocation[]> inputLocationList,
AggregationStep step) {
+ super(accumulator, inputLocationList, step);
+ }
+
+ @Override
+ protected void evictingExpiredValue() {
+ if (!deque.isEmpty() && !curTimeRange.contains(deque.getLast().getTime()))
{
+ this.accumulator.reset();
+ return;
+ }
+ while (!deque.isEmpty() &&
!curTimeRange.contains(deque.getFirst().getTime())) {
+ PartialAggregationResult partialResult = deque.removeFirst();
+ this.accumulator.addIntermediate(partialResult.opposite());
+ }
+ }
+
+ @Override
+ public void processPartialResult(PartialAggregationResult partialResult) {
+ if (!partialResult.isNull()) {
+ deque.addLast(partialResult);
+ this.accumulator.addIntermediate(partialResult.getPartialResult());
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 23ba0d1b57..3b5beb32eb 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -74,7 +74,7 @@ public class AggregationOperator implements ProcessOperator {
dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
}
tsBlockBuilder = new TsBlockBuilder(dataTypes);
- this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter,
ascending);
+ this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter,
ascending, true);
}
@Override
@@ -128,7 +128,7 @@ public class AggregationOperator implements ProcessOperator
{
public static TsBlock updateResultTsBlockFromAggregators(
TsBlockBuilder tsBlockBuilder,
- List<Aggregator> aggregators,
+ List<? extends Aggregator> aggregators,
ITimeRangeIterator timeRangeIterator) {
tsBlockBuilder.reset();
TimeColumnBuilder timeColumnBuilder =
tsBlockBuilder.getTimeColumnBuilder();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index fa724b12ab..3753635e7f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -79,7 +79,7 @@ public class RawDataAggregationOperator implements
ProcessOperator {
dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
}
tsBlockBuilder = new TsBlockBuilder(dataTypes);
- this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter,
ascending);
+ this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter,
ascending, true);
}
@Override
@@ -98,7 +98,7 @@ public class RawDataAggregationOperator implements
ProcessOperator {
curTimeRange = timeRangeIterator.nextTimeRange();
for (Aggregator aggregator : aggregators) {
aggregator.reset();
- aggregator.setTimeRange(curTimeRange);
+ aggregator.updateTimeRange(curTimeRange);
}
// 2. Calculate aggregation result based on current time window
@@ -170,7 +170,7 @@ public class RawDataAggregationOperator implements
ProcessOperator {
return tsBlock.subTsBlock(tsBlockIterator.getRowIndex());
}
- private boolean satisfied(TsBlock tsBlock, TimeRange timeRange, boolean
ascending) {
+ public static boolean satisfied(TsBlock tsBlock, TimeRange timeRange,
boolean ascending) {
TsBlockSingleColumnIterator tsBlockIterator =
tsBlock.getTsBlockSingleColumnIterator();
if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
return false;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
new file mode 100644
index 0000000000..150699ccb3
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import
org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregator;
+import
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator.updateResultTsBlockFromAggregators;
+import static
org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator.satisfied;
+import static
org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator.skipOutOfTimeRangePoints;
+import static
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator.initTimeRangeIterator;
+
+public class SlidingWindowAggregationOperator implements ProcessOperator {
+
+ private final OperatorContext operatorContext;
+ private final Operator child;
+
+ private TsBlock cachedTsBlock;
+
+ private final List<SlidingWindowAggregator> aggregators;
+
+ private final ITimeRangeIterator timeRangeIterator;
+
+ private final boolean ascending;
+
+ private final TsBlockBuilder tsBlockBuilder;
+
+ public SlidingWindowAggregationOperator(
+ OperatorContext operatorContext,
+ List<SlidingWindowAggregator> aggregators,
+ Operator child,
+ boolean ascending,
+ GroupByTimeParameter groupByTimeParameter) {
+ checkArgument(
+ groupByTimeParameter != null,
+ "GroupByTimeParameter cannot be null in
SlidingWindowAggregationOperator");
+
+ this.operatorContext = operatorContext;
+ this.aggregators = aggregators;
+ this.child = child;
+ List<TSDataType> outputDataTypes = new ArrayList<>();
+ for (Aggregator aggregator : aggregators) {
+ outputDataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
+ }
+ this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+ this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter,
ascending, false);
+ this.ascending = ascending;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return timeRangeIterator.hasNextTimeRange();
+ }
+
+ @Override
+ public TsBlock next() {
+ // 1. Clear previous aggregation result
+ TimeRange curTimeRange = timeRangeIterator.nextTimeRange();
+ for (SlidingWindowAggregator aggregator : aggregators) {
+ aggregator.updateTimeRange(curTimeRange);
+ }
+
+ // 2. Calculate aggregation result based on current time window
+ while (!calcFromTsBlock(cachedTsBlock, curTimeRange)) {
+ if (child.hasNext()) {
+ cachedTsBlock = child.next();
+ } else {
+ cachedTsBlock = null;
+ break;
+ }
+ }
+
+ // 3. Update result using aggregators
+ return updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators,
timeRangeIterator);
+ }
+
+ private boolean calcFromTsBlock(TsBlock inputTsBlock, TimeRange timeRange) {
+ // check if the batchData does not contain points in current interval
+ if (inputTsBlock != null && satisfied(inputTsBlock, timeRange, ascending))
{
+ // skip points that cannot be calculated
+ inputTsBlock = skipOutOfTimeRangePoints(inputTsBlock, timeRange,
ascending);
+ for (SlidingWindowAggregator aggregator : aggregators) {
+ aggregator.processTsBlock(inputTsBlock);
+ }
+ }
+ // The result is calculated from the cache
+ return inputTsBlock != null
+ && (ascending
+ ? inputTsBlock.getEndTime() > timeRange.getMax()
+ : inputTsBlock.getEndTime() < timeRange.getMin());
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return child.isBlocked();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return !this.hasNext();
+ }
+
+ @Override
+ public void close() throws Exception {
+ child.close();
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
index 9ef84c165d..5187384560 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
@@ -94,7 +94,7 @@ public class AlignedSeriesAggregationScanOperator implements
DataSourceOperator
dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
}
tsBlockBuilder = new TsBlockBuilder(dataTypes);
- this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter,
ascending);
+ this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter,
ascending, true);
}
@Override
@@ -125,7 +125,7 @@ public class AlignedSeriesAggregationScanOperator
implements DataSourceOperator
// 1. Clear previous aggregation result
for (Aggregator aggregator : aggregators) {
aggregator.reset();
- aggregator.setTimeRange(curTimeRange);
+ aggregator.updateTimeRange(curTimeRange);
}
// 2. Calculate aggregation result based on current time window
@@ -315,11 +315,9 @@ public class AlignedSeriesAggregationScanOperator
implements DataSourceOperator
calcFromBatch(tsBlock, curTimeRange);
// judge whether the calculation finished
- if (isEndCalc(aggregators)
- || (tsBlockIterator.hasNext()
- && (ascending
- ? tsBlockIterator.currentTime() > curTimeRange.getMax()
- : tsBlockIterator.currentTime() < curTimeRange.getMin()))) {
+ if (isEndCalc(aggregators) || ascending
+ ? tsBlock.getEndTime() > curTimeRange.getMax()
+ : tsBlock.getEndTime() < curTimeRange.getMin()) {
return true;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
index c234a7c2ef..807b1a2ebd 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
@@ -101,7 +101,7 @@ public class SeriesAggregationScanOperator implements
DataSourceOperator {
dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
}
tsBlockBuilder = new TsBlockBuilder(dataTypes);
- this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter,
ascending);
+ this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter,
ascending, true);
}
/**
@@ -110,7 +110,7 @@ public class SeriesAggregationScanOperator implements
DataSourceOperator {
* timestamp, so it doesn't matter what the time range returns.
*/
public static ITimeRangeIterator initTimeRangeIterator(
- GroupByTimeParameter groupByTimeParameter, boolean ascending) {
+ GroupByTimeParameter groupByTimeParameter, boolean ascending, boolean
isPreAggr) {
if (groupByTimeParameter == null) {
return new SingleTimeWindowIterator(0, Long.MAX_VALUE);
} else {
@@ -123,7 +123,7 @@ public class SeriesAggregationScanOperator implements
DataSourceOperator {
groupByTimeParameter.isIntervalByMonth(),
groupByTimeParameter.isSlidingStepByMonth(),
groupByTimeParameter.isLeftCRightO(),
- groupByTimeParameter.getInterval() >
groupByTimeParameter.getSlidingStep());
+ isPreAggr);
}
}
@@ -155,7 +155,7 @@ public class SeriesAggregationScanOperator implements
DataSourceOperator {
// 1. Clear previous aggregation result
for (Aggregator aggregator : aggregators) {
aggregator.reset();
- aggregator.setTimeRange(curTimeRange);
+ aggregator.updateTimeRange(curTimeRange);
}
// 2. Calculate aggregation result based on current time window
@@ -335,11 +335,9 @@ public class SeriesAggregationScanOperator implements
DataSourceOperator {
calcFromBatch(tsBlock, curTimeRange);
// judge whether the calculation finished
- if (isEndCalc(aggregators)
- || (tsBlockIterator.hasNext()
- && (ascending
- ? tsBlockIterator.currentTime() > curTimeRange.getMax()
- : tsBlockIterator.currentTime() < curTimeRange.getMin()))) {
+ if (isEndCalc(aggregators) || ascending
+ ? tsBlock.getEndTime() > curTimeRange.getMax()
+ : tsBlock.getEndTime() < curTimeRange.getMin()) {
return true;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 9e7d99cae9..26d9306095 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import
org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregator;
+import
org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregatorFactory;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockManager;
import org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockService;
@@ -51,6 +53,7 @@ import
org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
import
org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
import
org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
@@ -127,6 +130,7 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
@@ -784,6 +788,38 @@ public class LocalExecutionPlanner {
return super.visitGroupByLevel(node, context);
}
+ @Override
+ public Operator visitSlidingWindowAggregation(
+ SlidingWindowAggregationNode node, LocalExecutionPlanContext context) {
+ checkArgument(
+ node.getAggregationDescriptorList().size() >= 1,
+ "Aggregation descriptorList cannot be empty");
+ OperatorContext operatorContext =
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ SlidingWindowAggregationOperator.class.getSimpleName());
+ Operator child = node.getChild().accept(this, context);
+ boolean ascending = node.getScanOrder() == OrderBy.TIMESTAMP_ASC;
+ List<SlidingWindowAggregator> aggregators = new ArrayList<>();
+ Map<String, List<InputLocation>> layout = makeLayout(node);
+ for (AggregationDescriptor descriptor :
node.getAggregationDescriptorList()) {
+ List<InputLocation[]> inputLocationList =
calcInputLocationList(descriptor, layout);
+ aggregators.add(
+ SlidingWindowAggregatorFactory.createSlidingWindowAggregator(
+ descriptor.getAggregationType(),
+ context
+ .getTypeProvider()
+ // get the type of first inputExpression
+
.getType(descriptor.getInputExpressions().get(0).toString()),
+ ascending,
+ inputLocationList,
+ descriptor.getStep()));
+ }
+ return new SlidingWindowAggregationOperator(
+ operatorContext, aggregators, child, ascending,
node.getGroupByTimeParameter());
+ }
+
@Override
public Operator visitLimit(LimitNode node, LocalExecutionPlanContext
context) {
Operator child = node.getChild().accept(this, context);
@@ -822,23 +858,7 @@ public class LocalExecutionPlanner {
List<Aggregator> aggregators = new ArrayList<>();
Map<String, List<InputLocation>> layout = makeLayout(node);
for (AggregationDescriptor descriptor :
node.getAggregationDescriptorList()) {
- List<String> inputColumnNames = descriptor.getInputColumnNames();
- // it may include double parts
- List<List<InputLocation>> inputLocationParts = new
ArrayList<>(inputColumnNames.size());
- inputColumnNames.forEach(o -> inputLocationParts.add(layout.get(o)));
-
- List<InputLocation[]> inputLocationList = new ArrayList<>();
- for (int i = 0; i < inputLocationParts.get(0).size(); i++) {
- if (inputColumnNames.size() == 1) {
- inputLocationList.add(new InputLocation[]
{inputLocationParts.get(0).get(i)});
- } else {
- inputLocationList.add(
- new InputLocation[] {
- inputLocationParts.get(0).get(i),
inputLocationParts.get(1).get(i)
- });
- }
- }
-
+ List<InputLocation[]> inputLocationList =
calcInputLocationList(descriptor, layout);
aggregators.add(
new Aggregator(
AccumulatorFactory.createAccumulator(
@@ -876,6 +896,27 @@ public class LocalExecutionPlanner {
}
}
+ private List<InputLocation[]> calcInputLocationList(
+ AggregationDescriptor descriptor, Map<String, List<InputLocation>>
layout) {
+ List<String> inputColumnNames = descriptor.getInputColumnNames();
+ // it may include double parts
+ List<List<InputLocation>> inputLocationParts = new
ArrayList<>(inputColumnNames.size());
+ inputColumnNames.forEach(o -> inputLocationParts.add(layout.get(o)));
+
+ List<InputLocation[]> inputLocationList = new ArrayList<>();
+ for (int i = 0; i < inputLocationParts.get(0).size(); i++) {
+ if (inputColumnNames.size() == 1) {
+ inputLocationList.add(new InputLocation[]
{inputLocationParts.get(0).get(i)});
+ } else {
+ inputLocationList.add(
+ new InputLocation[] {
+ inputLocationParts.get(0).get(i),
inputLocationParts.get(1).get(i)
+ });
+ }
+ }
+ return inputLocationList;
+ }
+
@Override
public Operator visitSort(SortNode node, LocalExecutionPlanContext
context) {
return super.visitSort(node, context);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index d60d210118..a22371f711 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -53,10 +53,10 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTimeNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
@@ -175,6 +175,7 @@ public class LogicalPlanBuilder {
: AggregationStep.SINGLE;
List<PlanNode> sourceNodeList = new ArrayList<>();
+ boolean needCheckAscending = groupByTimeParameter == null;
Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new
HashMap<>();
Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new
HashMap<>();
for (Expression sourceExpression : sourceExpressions) {
@@ -189,7 +190,8 @@ public class LogicalPlanBuilder {
}
PartialPath selectPath =
((TimeSeriesOperand)
sourceExpression.getExpressions().get(0)).getPath();
- if (SchemaUtils.isConsistentWithScanOrder(aggregationFunction,
scanOrder)) {
+ if (!needCheckAscending
+ || SchemaUtils.isConsistentWithScanOrder(aggregationFunction,
scanOrder)) {
ascendingAggregations
.computeIfAbsent(selectPath, key -> new ArrayList<>())
.add(aggregationDescriptor);
@@ -202,8 +204,6 @@ public class LogicalPlanBuilder {
Map<PartialPath, List<AggregationDescriptor>> groupedAscendingAggregations
=
MetaUtils.groupAlignedAggregations(ascendingAggregations);
- Map<PartialPath, List<AggregationDescriptor>>
groupedDescendingAggregations =
- MetaUtils.groupAlignedAggregations(descendingAggregations);
for (Map.Entry<PartialPath, List<AggregationDescriptor>>
pathAggregationsEntry :
groupedAscendingAggregations.entrySet()) {
sourceNodeList.add(
@@ -214,29 +214,38 @@ public class LogicalPlanBuilder {
groupByTimeParameter,
timeFilter));
}
- for (Map.Entry<PartialPath, List<AggregationDescriptor>>
pathAggregationsEntry :
- groupedDescendingAggregations.entrySet()) {
- sourceNodeList.add(
- createAggregationScanNode(
- pathAggregationsEntry.getKey(),
- pathAggregationsEntry.getValue(),
- scanOrder,
- groupByTimeParameter,
- timeFilter));
+
+ if (needCheckAscending) {
+ Map<PartialPath, List<AggregationDescriptor>>
groupedDescendingAggregations =
+ MetaUtils.groupAlignedAggregations(descendingAggregations);
+ for (Map.Entry<PartialPath, List<AggregationDescriptor>>
pathAggregationsEntry :
+ groupedDescendingAggregations.entrySet()) {
+ sourceNodeList.add(
+ createAggregationScanNode(
+ pathAggregationsEntry.getKey(),
+ pathAggregationsEntry.getValue(),
+ scanOrder,
+ groupByTimeParameter,
+ timeFilter));
+ }
}
if (curStep.isOutputPartial()) {
if (groupByTimeParameter != null && groupByTimeParameter.hasOverlap()) {
curStep =
groupByLevelExpressions != null ? AggregationStep.INTERMEDIATE :
AggregationStep.FINAL;
+
+ this.root = convergeWithTimeJoin(sourceNodeList, scanOrder);
+
this.root =
- createGroupByTimeNode(
- sourceNodeList, aggregationExpressions, groupByTimeParameter,
curStep);
+ createSlidingWindowAggregationNode(
+ this.getRoot(), aggregationExpressions, groupByTimeParameter,
curStep, scanOrder);
if (groupByLevelExpressions != null) {
curStep = AggregationStep.FINAL;
this.root =
- createGroupByTLevelNode(this.root.getChildren(),
groupByLevelExpressions, curStep);
+ createGroupByTLevelNode(
+ Collections.singletonList(this.getRoot()),
groupByLevelExpressions, curStep);
}
} else {
if (groupByLevelExpressions != null) {
@@ -311,7 +320,8 @@ public class LogicalPlanBuilder {
Set<Expression> aggregationExpressions,
GroupByTimeParameter groupByTimeParameter,
AggregationStep curStep,
- TypeProvider typeProvider) {
+ TypeProvider typeProvider,
+ OrderBy scanOrder) {
if (aggregationExpressions == null) {
return this;
}
@@ -329,39 +339,40 @@ public class LogicalPlanBuilder {
context.getQueryId().genPlanNodeId(),
Collections.singletonList(this.getRoot()),
aggregationDescriptorList,
- groupByTimeParameter);
+ groupByTimeParameter,
+ scanOrder);
return this;
}
- public LogicalPlanBuilder planGroupByTime(
+ public LogicalPlanBuilder planSlidingWindowAggregation(
Set<Expression> aggregationExpressions,
GroupByTimeParameter groupByTimeParameter,
- AggregationStep curStep) {
+ AggregationStep curStep,
+ OrderBy scanOrder) {
if (aggregationExpressions == null) {
return this;
}
this.root =
- createGroupByTimeNode(
- Collections.singletonList(this.getRoot()),
- aggregationExpressions,
- groupByTimeParameter,
- curStep);
+ createSlidingWindowAggregationNode(
+ this.getRoot(), aggregationExpressions, groupByTimeParameter,
curStep, scanOrder);
return this;
}
- private PlanNode createGroupByTimeNode(
- List<PlanNode> children,
+ private PlanNode createSlidingWindowAggregationNode(
+ PlanNode child,
Set<Expression> aggregationExpressions,
GroupByTimeParameter groupByTimeParameter,
- AggregationStep curStep) {
+ AggregationStep curStep,
+ OrderBy scanOrder) {
List<AggregationDescriptor> aggregationDescriptorList =
constructAggregationDescriptorList(aggregationExpressions, curStep);
- return new GroupByTimeNode(
+ return new SlidingWindowAggregationNode(
context.getQueryId().genPlanNodeId(),
- children,
+ child,
aggregationDescriptorList,
- groupByTimeParameter);
+ groupByTimeParameter,
+ scanOrder);
}
private PlanNode createGroupByTLevelNode(
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index 0e6da8c705..4950a6427f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -221,7 +221,8 @@ public class LogicalPlanner {
aggregationExpressions,
analysis.getGroupByTimeParameter(),
curStep,
- analysis.getTypeProvider());
+ analysis.getTypeProvider(),
+ queryStatement.getResultOrder());
if (curStep.isOutputPartial()) {
if (queryStatement.isGroupByTime() &&
analysis.getGroupByTimeParameter().hasOverlap()) {
@@ -230,8 +231,11 @@ public class LogicalPlanner {
? AggregationStep.INTERMEDIATE
: AggregationStep.FINAL;
planBuilder =
- planBuilder.planGroupByTime(
- aggregationExpressions,
analysis.getGroupByTimeParameter(), curStep);
+ planBuilder.planSlidingWindowAggregation(
+ aggregationExpressions,
+ analysis.getGroupByTimeParameter(),
+ curStep,
+ queryStatement.getResultOrder());
}
if (queryStatement.isGroupByLevel()) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index c485ad136c..5d344f1f95 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -44,11 +44,11 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTimeNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
@@ -102,7 +102,7 @@ public enum PlanNodeType {
TIME_SERIES_COUNT((short) 28),
LEVEL_TIME_SERIES_COUNT((short) 29),
COUNT_MERGE((short) 30),
- GROUP_BY_TIME((short) 31),
+ SLIDING_WINDOW_AGGREGATION((short) 31),
PROJECT((short) 32),
ALIGNED_SERIES_SCAN((short) 33),
ALIGNED_SERIES_AGGREGATE_SCAN((short) 34),
@@ -208,7 +208,7 @@ public enum PlanNodeType {
case 30:
return CountSchemaMergeNode.deserialize(buffer);
case 31:
- return GroupByTimeNode.deserialize(buffer);
+ return SlidingWindowAggregationNode.deserialize(buffer);
case 32:
return ProjectNode.deserialize(buffer);
case 33:
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 2af623bfbe..5f5e09d228 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -44,11 +44,11 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTimeNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
@@ -114,7 +114,7 @@ public abstract class PlanVisitor<R, C> {
return visitPlan(node, context);
}
- public R visitGroupByTime(GroupByTimeNode node, C context) {
+ public R visitSlidingWindowAggregation(SlidingWindowAggregationNode node, C
context) {
return visitPlan(node, context);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
index 70e21e3958..bfaf165b36 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
@@ -52,36 +52,33 @@ public class AggregationNode extends MultiChildNode {
// The parameter of `group by time`.
// Its value will be null if there is no `group by time` clause.
@Nullable protected GroupByTimeParameter groupByTimeParameter;
- protected OrderBy scanOrder = OrderBy.TIMESTAMP_ASC;
- public AggregationNode(
- PlanNodeId id,
- List<PlanNode> children,
- List<AggregationDescriptor> aggregationDescriptorList) {
- this(id, children, aggregationDescriptorList, null);
- }
+ protected OrderBy scanOrder;
public AggregationNode(
PlanNodeId id,
- List<PlanNode> children,
List<AggregationDescriptor> aggregationDescriptorList,
- @Nullable GroupByTimeParameter groupByTimeParameter) {
- super(id, children);
+ @Nullable GroupByTimeParameter groupByTimeParameter,
+ OrderBy scanOrder) {
+ super(id, new ArrayList<>());
this.aggregationDescriptorList =
getDeduplicatedDescriptors(aggregationDescriptorList);
this.groupByTimeParameter = groupByTimeParameter;
- }
-
- public AggregationNode(PlanNodeId id, List<AggregationDescriptor>
aggregationDescriptorList) {
- this(id, aggregationDescriptorList, null);
+ this.scanOrder = scanOrder;
}
public AggregationNode(
PlanNodeId id,
+ List<PlanNode> children,
List<AggregationDescriptor> aggregationDescriptorList,
- @Nullable GroupByTimeParameter groupByTimeParameter) {
- super(id, new ArrayList<>());
- this.aggregationDescriptorList =
getDeduplicatedDescriptors(aggregationDescriptorList);
- this.groupByTimeParameter = groupByTimeParameter;
+ @Nullable GroupByTimeParameter groupByTimeParameter,
+ OrderBy scanOrder) {
+ this(id, aggregationDescriptorList, groupByTimeParameter, scanOrder);
+ this.children = children;
+ }
+
+ @Deprecated
+ public AggregationNode(PlanNodeId id, List<AggregationDescriptor>
aggregationDescriptorList) {
+ this(id, aggregationDescriptorList, null, OrderBy.TIMESTAMP_ASC);
}
public List<AggregationDescriptor> getAggregationDescriptorList() {
@@ -115,7 +112,7 @@ public class AggregationNode extends MultiChildNode {
@Override
public PlanNode clone() {
return new AggregationNode(
- getPlanNodeId(), getAggregationDescriptorList(),
getGroupByTimeParameter());
+ getPlanNodeId(), getAggregationDescriptorList(),
getGroupByTimeParameter(), getScanOrder());
}
@Override
@@ -148,6 +145,7 @@ public class AggregationNode extends MultiChildNode {
ReadWriteIOUtils.write((byte) 1, byteBuffer);
groupByTimeParameter.serialize(byteBuffer);
}
+ ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
}
public static AggregationNode deserialize(ByteBuffer byteBuffer) {
@@ -162,8 +160,10 @@ public class AggregationNode extends MultiChildNode {
if (isNull == 1) {
groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
}
+ OrderBy scanOrder = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new AggregationNode(planNodeId, aggregationDescriptorList,
groupByTimeParameter);
+ return new AggregationNode(
+ planNodeId, aggregationDescriptorList, groupByTimeParameter,
scanOrder);
}
@Override
@@ -178,15 +178,15 @@ public class AggregationNode extends MultiChildNode {
return false;
}
AggregationNode that = (AggregationNode) o;
- return aggregationDescriptorList.equals(that.aggregationDescriptorList)
+ return Objects.equals(aggregationDescriptorList,
that.aggregationDescriptorList)
&& Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
- && Objects.equals(children, that.children);
+ && scanOrder == that.scanOrder;
}
@Override
public int hashCode() {
return Objects.hash(
- super.hashCode(), aggregationDescriptorList, groupByTimeParameter,
children);
+ super.hashCode(), aggregationDescriptorList, groupByTimeParameter,
scanOrder);
}
/**
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTimeNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
similarity index 72%
rename from
server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTimeNode.java
rename to
server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
index 36a5bc2b9b..d0df526b93 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTimeNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
@@ -25,9 +25,10 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import javax.annotation.Nullable;
+import com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -35,65 +36,75 @@ import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
-public class GroupByTimeNode extends ProcessNode {
+public class SlidingWindowAggregationNode extends ProcessNode {
// The list of aggregate functions, each AggregateDescriptor will be output
as one column of
// result TsBlock
private final List<AggregationDescriptor> aggregationDescriptorList;
// The parameter of `group by time`.
- // Its value will be null if there is no `group by time` clause.
- @Nullable private final GroupByTimeParameter groupByTimeParameter;
+ private final GroupByTimeParameter groupByTimeParameter;
- private List<PlanNode> children;
+ protected OrderBy scanOrder = OrderBy.TIMESTAMP_ASC;
- public GroupByTimeNode(
+ private PlanNode child;
+
+ public SlidingWindowAggregationNode(
PlanNodeId id,
List<AggregationDescriptor> aggregationDescriptorList,
- @Nullable GroupByTimeParameter groupByTimeParameter) {
+ GroupByTimeParameter groupByTimeParameter,
+ OrderBy scanOrder) {
super(id);
this.aggregationDescriptorList = aggregationDescriptorList;
this.groupByTimeParameter = groupByTimeParameter;
- this.children = new ArrayList<>();
+ this.scanOrder = scanOrder;
}
- public GroupByTimeNode(
+ public SlidingWindowAggregationNode(
PlanNodeId id,
- List<PlanNode> children,
+ PlanNode child,
List<AggregationDescriptor> aggregationDescriptorList,
- GroupByTimeParameter groupByTimeParameter) {
- this(id, aggregationDescriptorList, groupByTimeParameter);
- this.children = children;
+ GroupByTimeParameter groupByTimeParameter,
+ OrderBy scanOrder) {
+ this(id, aggregationDescriptorList, groupByTimeParameter, scanOrder);
+ this.child = child;
}
public List<AggregationDescriptor> getAggregationDescriptorList() {
return aggregationDescriptorList;
}
- @Nullable
public GroupByTimeParameter getGroupByTimeParameter() {
return groupByTimeParameter;
}
+ public OrderBy getScanOrder() {
+ return scanOrder;
+ }
+
+ public PlanNode getChild() {
+ return child;
+ }
+
@Override
public List<PlanNode> getChildren() {
- return children;
+ return ImmutableList.of(child);
}
@Override
public void addChild(PlanNode child) {
- this.children.add(child);
+ this.child = child;
}
@Override
public int allowedChildCount() {
- return CHILD_COUNT_NO_LIMIT;
+ return ONE_CHILD;
}
@Override
public PlanNode clone() {
- return new GroupByTimeNode(
- getPlanNodeId(), getAggregationDescriptorList(),
getGroupByTimeParameter());
+ return new SlidingWindowAggregationNode(
+ getPlanNodeId(), getAggregationDescriptorList(),
getGroupByTimeParameter(), getScanOrder());
}
@Override
@@ -106,12 +117,12 @@ public class GroupByTimeNode extends ProcessNode {
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
- return visitor.visitGroupByTime(this, context);
+ return visitor.visitSlidingWindowAggregation(this, context);
}
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.GROUP_BY_TIME.serialize(byteBuffer);
+ PlanNodeType.SLIDING_WINDOW_AGGREGATION.serialize(byteBuffer);
ReadWriteIOUtils.write(aggregationDescriptorList.size(), byteBuffer);
for (AggregationDescriptor aggregationDescriptor :
aggregationDescriptorList) {
aggregationDescriptor.serialize(byteBuffer);
@@ -122,9 +133,10 @@ public class GroupByTimeNode extends ProcessNode {
ReadWriteIOUtils.write((byte) 1, byteBuffer);
groupByTimeParameter.serialize(byteBuffer);
}
+ ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
}
- public static GroupByTimeNode deserialize(ByteBuffer byteBuffer) {
+ public static SlidingWindowAggregationNode deserialize(ByteBuffer
byteBuffer) {
int descriptorSize = ReadWriteIOUtils.readInt(byteBuffer);
List<AggregationDescriptor> aggregationDescriptorList = new ArrayList<>();
while (descriptorSize > 0) {
@@ -136,8 +148,10 @@ public class GroupByTimeNode extends ProcessNode {
if (isNull == 1) {
groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
}
+ OrderBy scanOrder = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new GroupByTimeNode(planNodeId, aggregationDescriptorList,
groupByTimeParameter);
+ return new SlidingWindowAggregationNode(
+ planNodeId, aggregationDescriptorList, groupByTimeParameter,
scanOrder);
}
@Override
@@ -151,15 +165,14 @@ public class GroupByTimeNode extends ProcessNode {
if (!super.equals(o)) {
return false;
}
- GroupByTimeNode that = (GroupByTimeNode) o;
+ SlidingWindowAggregationNode that = (SlidingWindowAggregationNode) o;
return Objects.equals(aggregationDescriptorList,
that.aggregationDescriptorList)
&& Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
- && Objects.equals(children, that.children);
+ && Objects.equals(child, that.child);
}
@Override
public int hashCode() {
- return Objects.hash(
- super.hashCode(), aggregationDescriptorList, groupByTimeParameter,
children);
+ return Objects.hash(super.hashCode(), aggregationDescriptorList,
groupByTimeParameter, child);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
b/server/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 476ebbfb41..1a82376149 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.db.exception.StorageGroupNotReadyException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.exception.sql.SQLParserException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.TsFileRuntimeException;
@@ -119,6 +120,8 @@ public class ErrorHandlingUtils {
return RpcUtils.getStatus(((IoTDBException) t).getErrorCode(),
rootCause.getMessage());
} else if (t instanceof TsFileRuntimeException) {
return RpcUtils.getStatus(TSStatusCode.TSFILE_PROCESSOR_ERROR,
rootCause.getMessage());
+ } else if (t instanceof SemanticException) {
+ return RpcUtils.getStatus(TSStatusCode.SEMANTIC_ERROR,
rootCause.getMessage());
}
return null;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
b/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
index ecd646ca6e..e0b8f0f020 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.utils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -109,6 +110,10 @@ public class TypeInferenceUtils {
if (aggrFuncName == null) {
throw new IllegalArgumentException("AggregateFunction Name must not be
null");
}
+ if (!verifyIsAggregationDataTypeMatched(aggrFuncName, dataType)) {
+ throw new SemanticException(
+ "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE, MAX_VALUE] only
support numeric data types [INT32, INT64, FLOAT, DOUBLE]");
+ }
switch (aggrFuncName.toLowerCase()) {
case SQLConstant.MIN_TIME:
@@ -128,4 +133,24 @@ public class TypeInferenceUtils {
throw new IllegalArgumentException("Invalid Aggregation function: " +
aggrFuncName);
}
}
+
+ private static boolean verifyIsAggregationDataTypeMatched(
+ String aggrFuncName, TSDataType dataType) {
+ switch (aggrFuncName.toLowerCase()) {
+ case SQLConstant.AVG:
+ case SQLConstant.SUM:
+ case SQLConstant.EXTREME:
+ case SQLConstant.MIN_VALUE:
+ case SQLConstant.MAX_VALUE:
+ return dataType.isNumeric();
+ case SQLConstant.COUNT:
+ case SQLConstant.MIN_TIME:
+ case SQLConstant.MAX_TIME:
+ case SQLConstant.FIRST_VALUE:
+ case SQLConstant.LAST_VALUE:
+ return true;
+ default:
+ throw new IllegalArgumentException("Invalid Aggregation function: " +
aggrFuncName);
+ }
+ }
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
index eaf58a6f22..35cd0e25bb 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
@@ -68,7 +68,7 @@ public class AggregationOperatorTest {
private final List<TsFileResource> seqResources = new ArrayList<>();
private final List<TsFileResource> unSeqResources = new ArrayList<>();
private ExecutorService instanceNotificationExecutor =
- IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");;
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
@Before
public void setUp() throws MetadataException, IOException,
WriteProcessException {
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
new file mode 100644
index 0000000000..ee0deb4df5
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import
org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregator;
+import
org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregatorFactory;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import
org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.constant.TestConstant.count;
+import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+
+public class SlidingWindowAggregationOperatorTest {
+
+ private static final String AGGREGATION_OPERATOR_TEST_SG =
+ "root.SlidingWindowAggregationOperatorTest";
+ private final List<String> deviceIds = new ArrayList<>();
+ private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+ private final List<TsFileResource> seqResources = new ArrayList<>();
+ private final List<TsFileResource> unSeqResources = new ArrayList<>();
+ private ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+
+ private final List<AggregationType> leafAggregationTypes =
+ Arrays.asList(
+ AggregationType.COUNT,
+ AggregationType.SUM,
+ AggregationType.LAST_VALUE,
+ AggregationType.FIRST_VALUE,
+ AggregationType.MAX_VALUE,
+ AggregationType.MIN_VALUE);
+
+ private final List<AggregationType> rootAggregationTypes =
+ Arrays.asList(
+ AggregationType.COUNT,
+ AggregationType.AVG,
+ AggregationType.SUM,
+ AggregationType.LAST_VALUE,
+ AggregationType.MIN_TIME,
+ AggregationType.MAX_TIME,
+ AggregationType.FIRST_VALUE,
+ AggregationType.MAX_VALUE,
+ AggregationType.MIN_VALUE);
+
+ private final List<List<List<InputLocation>>> inputLocations =
+ Arrays.asList(
+ Collections.singletonList(Collections.singletonList(new
InputLocation(0, 0))),
+ Collections.singletonList(
+ Arrays.asList(new InputLocation(0, 0), new InputLocation(0, 1))),
+ Collections.singletonList(Collections.singletonList(new
InputLocation(0, 1))),
+ Collections.singletonList(
+ Arrays.asList(new InputLocation(0, 2), new InputLocation(0, 3))),
+ Collections.singletonList(Collections.singletonList(new
InputLocation(0, 5))),
+ Collections.singletonList(Collections.singletonList(new
InputLocation(0, 3))),
+ Collections.singletonList(
+ Arrays.asList(new InputLocation(0, 4), new InputLocation(0, 5))),
+ Collections.singletonList(Collections.singletonList(new
InputLocation(0, 6))),
+ Collections.singletonList(Collections.singletonList(new
InputLocation(0, 7))));
+
+ private final GroupByTimeParameter groupByTimeParameter =
+ new GroupByTimeParameter(0, 300, 100, 50, true);
+
+ @Before
+ public void setUp() throws MetadataException, IOException,
WriteProcessException {
+ SeriesReaderTestUtil.setUp(
+ measurementSchemas, deviceIds, seqResources, unSeqResources,
AGGREGATION_OPERATOR_TEST_SG);
+ this.instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ instanceNotificationExecutor.shutdown();
+ }
+
+ @Test
+ public void slidingWindowAggregationTest() throws IllegalPathException {
+ String[] retArray =
+ new String[] {
+ "0,100,20049.5,2004950.0,20099,0,99,20000,20099,20000",
+ "50,100,20099.5,2009950.0,20149,50,149,20050,20149,20050",
+ "100,100,20149.5,2014950.0,20199,100,199,20100,20199,20100",
+ "150,100,15199.5,1519950.0,10249,150,249,20150,20199,10200",
+ "200,100,6249.5,624950.0,299,200,299,10200,10259,260",
+ "250,50,2274.5,113725.0,299,250,299,10250,10259,260",
+ };
+
+ SlidingWindowAggregationOperator slidingWindowAggregationOperator1 =
+ initSlidingWindowAggregationOperator(true);
+ int count = 0;
+ while (slidingWindowAggregationOperator1.hasNext()) {
+ TsBlock resultTsBlock = slidingWindowAggregationOperator1.next();
+ Assert.assertEquals(rootAggregationTypes.size(),
resultTsBlock.getValueColumnCount());
+ Assert.assertEquals(retArray[count], getResultString(resultTsBlock));
+ count++;
+ }
+ Assert.assertEquals(retArray.length, count);
+
+ SlidingWindowAggregationOperator slidingWindowAggregationOperator2 =
+ initSlidingWindowAggregationOperator(false);
+ while (slidingWindowAggregationOperator2.hasNext()) {
+ TsBlock resultTsBlock = slidingWindowAggregationOperator2.next();
+ Assert.assertEquals(rootAggregationTypes.size(),
resultTsBlock.getValueColumnCount());
+ Assert.assertEquals(retArray[count - 1], getResultString(resultTsBlock));
+ count--;
+ }
+ Assert.assertEquals(0, count);
+ }
+
+ private String getResultString(TsBlock resultTsBlock) {
+ return resultTsBlock.getTimeColumn().getLong(0)
+ + ","
+ + resultTsBlock.getColumn(0).getLong(0)
+ + ","
+ + resultTsBlock.getColumn(1).getDouble(0)
+ + ","
+ + resultTsBlock.getColumn(2).getDouble(0)
+ + ","
+ + resultTsBlock.getColumn(3).getInt(0)
+ + ","
+ + resultTsBlock.getColumn(4).getLong(0)
+ + ","
+ + resultTsBlock.getColumn(5).getLong(0)
+ + ","
+ + resultTsBlock.getColumn(6).getInt(0)
+ + ","
+ + resultTsBlock.getColumn(7).getInt(0)
+ + ","
+ + resultTsBlock.getColumn(8).getInt(0);
+ }
+
+ private SlidingWindowAggregationOperator
initSlidingWindowAggregationOperator(boolean ascending)
+ throws IllegalPathException {
+ // Construct operator tree
+ QueryId queryId = new QueryId("test");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId sourceId = queryId.genPlanNodeId();
+ fragmentInstanceContext.addOperatorContext(
+ 0, sourceId, SeriesAggregationScanOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 1, queryId.genPlanNodeId(),
SlidingWindowAggregationOperator.class.getSimpleName());
+
+ MeasurementPath d0s0 =
+ new MeasurementPath(AGGREGATION_OPERATOR_TEST_SG + ".device0.sensor0",
TSDataType.INT32);
+
+ List<Aggregator> aggregators = new ArrayList<>();
+ AccumulatorFactory.createAccumulators(leafAggregationTypes,
TSDataType.INT32, ascending)
+ .forEach(
+ accumulator -> aggregators.add(new Aggregator(accumulator,
AggregationStep.PARTIAL)));
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator =
+ new SeriesAggregationScanOperator(
+ sourceId,
+ d0s0,
+ Collections.singleton("sensor0"),
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ aggregators,
+ null,
+ ascending,
+ groupByTimeParameter);
+ seriesAggregationScanOperator.initQueryDataSource(
+ new QueryDataSource(seqResources, unSeqResources));
+
+ List<SlidingWindowAggregator> finalAggregators = new ArrayList<>();
+ for (int i = 0; i < rootAggregationTypes.size(); i++) {
+ finalAggregators.add(
+ SlidingWindowAggregatorFactory.createSlidingWindowAggregator(
+ rootAggregationTypes.get(i),
+ TSDataType.INT32,
+ ascending,
+ inputLocations.get(i).stream()
+ .map(tmpInputLocations -> tmpInputLocations.toArray(new
InputLocation[0]))
+ .collect(Collectors.toList()),
+ AggregationStep.FINAL));
+ }
+
+ return new SlidingWindowAggregationOperator(
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ finalAggregators,
+ seriesAggregationScanOperator,
+ ascending,
+ groupByTimeParameter);
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
index e58bf4af47..46f7b07683 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
@@ -828,7 +828,9 @@ public class QueryLogicalPlanUtil {
AggregationType.LAST_VALUE,
AggregationStep.PARTIAL,
Collections.singletonList(
- new
TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))));
+ new
TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))),
+ null,
+ OrderBy.TIMESTAMP_ASC);
GroupByLevelNode groupByLevelNode =
new GroupByLevelNode(
@@ -930,7 +932,9 @@ public class QueryLogicalPlanUtil {
AggregationType.LAST_VALUE,
AggregationStep.SINGLE,
Collections.singletonList(
- new
TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))));
+ new
TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))),
+ null,
+ OrderBy.TIMESTAMP_DESC);
List<PlanNode> sourceNodeList2 = new ArrayList<>();
sourceNodeList2.add(
@@ -986,7 +990,9 @@ public class QueryLogicalPlanUtil {
AggregationType.LAST_VALUE,
AggregationStep.SINGLE,
Collections.singletonList(
- new
TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))));
+ new
TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))),
+ null,
+ OrderBy.TIMESTAMP_DESC);
Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2, 3));
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/AggregationNodeSerdeTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/AggregationNodeSerdeTest.java
index 4a770db76a..7a42e4c85c 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/AggregationNodeSerdeTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/AggregationNodeSerdeTest.java
@@ -75,7 +75,8 @@ public class AggregationNodeSerdeTest {
AggregationStep.FINAL,
Collections.singletonList(
new TimeSeriesOperand(new
PartialPath("root.sg.d1.s1"))))),
- groupByTimeParameter);
+ groupByTimeParameter,
+ OrderBy.TIMESTAMP_ASC);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
aggregationNode.serialize(byteBuffer);
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 6d72f1d7be..85ce2f54b9 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -84,6 +84,7 @@ public enum TSStatusCode {
WRITE_PROCESS_REJECT(413),
QUERY_ID_NOT_EXIST(414),
SNAPSHOT_DIR_NOT_LEGAL(415),
+ SEMANTIC_ERROR(416),
UNSUPPORTED_INDEX_FUNC_ERROR(421),
UNSUPPORTED_INDEX_TYPE_ERROR(422),