This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e368d4eef [IOTDB-3081] Implementation of 
SlidingWindowAggregationOperator (#5986)
5e368d4eef is described below

commit 5e368d4eef6925c616821bd2543092079c676ba4
Author: liuminghui233 <[email protected]>
AuthorDate: Thu May 26 19:34:41 2022 +0800

    [IOTDB-3081] Implementation of SlidingWindowAggregationOperator (#5986)
---
 .../iotdb/db/mpp/aggregation/Aggregator.java       |  22 +-
 .../iotdb/db/mpp/aggregation/AvgAccumulator.java   |   6 +
 .../db/mpp/aggregation/MinTimeDescAccumulator.java |   2 +-
 .../EmptyQueueSlidingWindowAggregator.java         |  58 +++++
 .../MonotonicQueueSlidingWindowAggregator.java     |  76 +++++++
 .../NormalQueueSlidingWindowAggregator.java        |  61 ++++++
 .../slidingwindow/SlidingWindowAggregator.java     | 143 ++++++++++++
 .../SlidingWindowAggregatorFactory.java            | 149 +++++++++++++
 .../SmoothQueueSlidingWindowAggregator.java        |  57 +++++
 .../operator/process/AggregationOperator.java      |   4 +-
 .../process/RawDataAggregationOperator.java        |   6 +-
 .../process/SlidingWindowAggregationOperator.java  | 144 ++++++++++++
 .../AlignedSeriesAggregationScanOperator.java      |  12 +-
 .../source/SeriesAggregationScanOperator.java      |  16 +-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  75 +++++--
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |  73 +++---
 .../iotdb/db/mpp/plan/planner/LogicalPlanner.java  |  10 +-
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |   6 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |   4 +-
 .../planner/plan/node/process/AggregationNode.java |  46 ++--
 ...Node.java => SlidingWindowAggregationNode.java} |  67 +++---
 .../apache/iotdb/db/utils/ErrorHandlingUtils.java  |   3 +
 .../apache/iotdb/db/utils/TypeInferenceUtils.java  |  25 +++
 .../operator/AggregationOperatorTest.java          |   2 +-
 .../SlidingWindowAggregationOperatorTest.java      | 244 +++++++++++++++++++++
 .../db/mpp/plan/plan/QueryLogicalPlanUtil.java     |  12 +-
 .../node/process/AggregationNodeSerdeTest.java     |   3 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 28 files changed, 1183 insertions(+), 144 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
index b9ccc4c7b3..d99d71a153 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -34,12 +34,12 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 
 public class Aggregator {
 
-  private final Accumulator accumulator;
+  protected final Accumulator accumulator;
   // In some intermediate result input, inputLocation[] should include two 
columns
-  private List<InputLocation[]> inputLocationList;
-  private final AggregationStep step;
+  protected List<InputLocation[]> inputLocationList;
+  protected final AggregationStep step;
 
-  private TimeRange timeRange = new TimeRange(0, Long.MAX_VALUE);
+  protected TimeRange curTimeRange = new TimeRange(0, Long.MAX_VALUE);
 
   // Used for SeriesAggregateScanOperator
   public Aggregator(Accumulator accumulator, AggregationStep step) {
@@ -61,7 +61,7 @@ public class Aggregator {
         step.isInputRaw(),
         "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can 
only process raw input");
     if (inputLocationList == null) {
-      accumulator.addInput(tsBlock.getTimeAndValueColumn(0), timeRange);
+      accumulator.addInput(tsBlock.getTimeAndValueColumn(0), curTimeRange);
     } else {
       for (InputLocation[] inputLocations : inputLocationList) {
         checkArgument(
@@ -70,7 +70,7 @@ public class Aggregator {
         Column[] timeValueColumn = new Column[2];
         timeValueColumn[0] = tsBlock.getTimeColumn();
         timeValueColumn[1] = 
tsBlock.getColumn(inputLocations[0].getValueColumnIndex());
-        accumulator.addInput(timeValueColumn, timeRange);
+        accumulator.addInput(timeValueColumn, curTimeRange);
       }
     }
   }
@@ -129,7 +129,7 @@ public class Aggregator {
   }
 
   public void reset() {
-    timeRange = new TimeRange(0, Long.MAX_VALUE);
+    curTimeRange = new TimeRange(0, Long.MAX_VALUE);
     accumulator.reset();
   }
 
@@ -137,11 +137,11 @@ public class Aggregator {
     return accumulator.hasFinalResult();
   }
 
-  public void setTimeRange(TimeRange timeRange) {
-    this.timeRange = timeRange;
+  public void updateTimeRange(TimeRange curTimeRange) {
+    this.curTimeRange = curTimeRange;
   }
 
-  public TimeRange getTimeRange() {
-    return timeRange;
+  public TimeRange getCurTimeRange() {
+    return curTimeRange;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
index 09dcf14628..4f1e9cd8a6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
@@ -74,6 +74,9 @@ public class AvgAccumulator implements Accumulator {
     initResult = true;
     countValue += partialResult[0].getLong(0);
     sumValue += partialResult[1].getDouble(0);
+    if (countValue == 0) {
+      initResult = false;
+    }
   }
 
   @Override
@@ -85,6 +88,9 @@ public class AvgAccumulator implements Accumulator {
     } else {
       sumValue += statistics.getSumDoubleValue();
     }
+    if (countValue == 0) {
+      initResult = false;
+    }
   }
 
   // Set sumValue to finalResult and keep countValue equals to 1
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
index 615a05686c..8fe815eb7c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
@@ -28,7 +28,7 @@ public class MinTimeDescAccumulator extends 
MinTimeAccumulator {
   public void addInput(Column[] column, TimeRange timeRange) {
     for (int i = 0; i < column[0].getPositionCount(); i++) {
       long curTime = column[0].getLong(i);
-      if (timeRange.contains(curTime)) {
+      if (timeRange.contains(curTime) && !column[1].isNull(i)) {
         updateMinTime(curTime);
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/EmptyQueueSlidingWindowAggregator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/EmptyQueueSlidingWindowAggregator.java
new file mode 100644
index 0000000000..52b009e922
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/EmptyQueueSlidingWindowAggregator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation.slidingwindow;
+
+import org.apache.iotdb.db.mpp.aggregation.Accumulator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+
+import java.util.List;
+
+/**
+ * When calculating MAX_TIME and LAST_VALUE (the MIN_TIME and FIRST_VALUE in 
descending order), the
+ * aggregation result always appears in the most recent pre-aggregation 
result. So, we do not need
+ * to cache the previous pre-aggregated results in the queue.
+ */
+public class EmptyQueueSlidingWindowAggregator extends SlidingWindowAggregator 
{
+
+  private long lastTime;
+
+  public EmptyQueueSlidingWindowAggregator(
+      Accumulator accumulator, List<InputLocation[]> inputLocationList, 
AggregationStep step) {
+    super(accumulator, inputLocationList, step);
+  }
+
+  @Override
+  protected void evictingExpiredValue() {
+    if (lastTime != -1 && !curTimeRange.contains(lastTime)) {
+      this.accumulator.reset();
+      lastTime = -1;
+    }
+  }
+
+  @Override
+  public void processPartialResult(PartialAggregationResult partialResult) {
+    if (!partialResult.isNull()) {
+      this.accumulator.reset();
+      this.accumulator.addIntermediate(partialResult.getPartialResult());
+      lastTime = partialResult.getTime();
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/MonotonicQueueSlidingWindowAggregator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/MonotonicQueueSlidingWindowAggregator.java
new file mode 100644
index 0000000000..7706edfb2c
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/MonotonicQueueSlidingWindowAggregator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation.slidingwindow;
+
+import org.apache.iotdb.db.mpp.aggregation.Accumulator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * When computing MAX_VALUE, MIN_VALUE, EXTREME, we only add partial 
aggregation results that
+ * maintain monotonicity to queue. The aggregation result always appears at 
the head of the queue.
+ */
+public class MonotonicQueueSlidingWindowAggregator extends 
SlidingWindowAggregator {
+
+  private final Comparator<Column> comparator;
+
+  public MonotonicQueueSlidingWindowAggregator(
+      Accumulator accumulator,
+      List<InputLocation[]> inputLocationList,
+      AggregationStep step,
+      Comparator<Column> comparator) {
+    super(accumulator, inputLocationList, step);
+    this.comparator = comparator;
+  }
+
+  @Override
+  protected void evictingExpiredValue() {
+    while (!deque.isEmpty() && 
!curTimeRange.contains(deque.getFirst().getTime())) {
+      deque.removeFirst();
+    }
+    this.accumulator.reset();
+    if (!deque.isEmpty()) {
+      this.accumulator.addIntermediate(deque.getFirst().getPartialResult());
+    }
+  }
+
+  @Override
+  public void processPartialResult(PartialAggregationResult partialResult) {
+    if (partialResult.isNull()) {
+      return;
+    }
+
+    while (!deque.isEmpty()
+        && comparator.compare(
+                partialResult.getPartialResult()[0], 
deque.getLast().getPartialResult()[0])
+            > 0) {
+      deque.removeLast();
+    }
+    deque.addLast(partialResult);
+    this.accumulator.reset();
+    if (!deque.isEmpty()) {
+      this.accumulator.addIntermediate(deque.getFirst().getPartialResult());
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/NormalQueueSlidingWindowAggregator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/NormalQueueSlidingWindowAggregator.java
new file mode 100644
index 0000000000..eecaa8cffe
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/NormalQueueSlidingWindowAggregator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation.slidingwindow;
+
+import org.apache.iotdb.db.mpp.aggregation.Accumulator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+
+import java.util.List;
+
+/**
+ * When calculating MIN_TIME and FIRST_VALUE (MAX_TIME and LAST_VALUE in 
descending order), the
+ * aggregated result always appears at the head of the queue. We need to cache 
all pre-aggregated
+ * results in the queue.
+ */
+public class NormalQueueSlidingWindowAggregator extends 
SlidingWindowAggregator {
+
+  public NormalQueueSlidingWindowAggregator(
+      Accumulator accumulator, List<InputLocation[]> inputLocationList, 
AggregationStep step) {
+    super(accumulator, inputLocationList, step);
+  }
+
+  @Override
+  protected void evictingExpiredValue() {
+    while (!deque.isEmpty() && 
!curTimeRange.contains(deque.getFirst().getTime())) {
+      deque.removeFirst();
+    }
+    this.accumulator.reset();
+    if (!deque.isEmpty()) {
+      this.accumulator.addIntermediate(deque.getFirst().getPartialResult());
+    }
+  }
+
+  @Override
+  public void processPartialResult(PartialAggregationResult partialResult) {
+    if (!partialResult.isNull()) {
+      deque.addLast(partialResult);
+    }
+    this.accumulator.reset();
+    if (!deque.isEmpty()) {
+      this.accumulator.addIntermediate(deque.getFirst().getPartialResult());
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
new file mode 100644
index 0000000000..5ab0d0195e
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation.slidingwindow;
+
+import org.apache.iotdb.db.mpp.aggregation.Accumulator;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public abstract class SlidingWindowAggregator extends Aggregator {
+
+  // cached partial aggregation result of pre-aggregate windows
+  protected Deque<PartialAggregationResult> deque;
+
+  public SlidingWindowAggregator(
+      Accumulator accumulator, List<InputLocation[]> inputLocationList, 
AggregationStep step) {
+    super(accumulator, step, inputLocationList);
+    this.deque = new LinkedList<>();
+  }
+
+  @Override
+  public void processTsBlock(TsBlock tsBlock) {
+    checkArgument(
+        step.isInputPartial(),
+        "Step in SlidingWindowAggregationOperator can only process partial 
result");
+    TimeColumn timeColumn = tsBlock.getTimeColumn();
+    Column[] valueColumn = new Column[inputLocationList.get(0).length];
+    for (int i = 0; i < inputLocationList.get(0).length; i++) {
+      InputLocation inputLocation = inputLocationList.get(0)[i];
+      checkArgument(
+          inputLocation.getTsBlockIndex() == 0,
+          "SlidingWindowAggregationOperator can only process one tsBlock 
input.");
+      valueColumn[i] = tsBlock.getColumn(inputLocation.getValueColumnIndex());
+    }
+    processPartialResult(new PartialAggregationResult(timeColumn, 
valueColumn));
+  }
+
+  @Override
+  public void updateTimeRange(TimeRange curTimeRange) {
+    this.curTimeRange = curTimeRange;
+    evictingExpiredValue();
+  }
+
+  /** evicting expired element in queue and reset expired aggregateResult */
+  protected abstract void evictingExpiredValue();
+
+  /** update queue and aggregateResult */
+  public abstract void processPartialResult(PartialAggregationResult 
partialResult);
+
+  protected static class PartialAggregationResult {
+
+    private final TimeColumn timeColumn;
+    private final Column[] partialResultColumns;
+
+    public PartialAggregationResult(TimeColumn timeColumn, Column[] 
partialResultColumns) {
+      this.timeColumn = timeColumn;
+      this.partialResultColumns = partialResultColumns;
+    }
+
+    public boolean isNull() {
+      return partialResultColumns[0].isNull(0);
+    }
+
+    public long getTime() {
+      return timeColumn.getLong(0);
+    }
+
+    public Column[] getPartialResult() {
+      return partialResultColumns;
+    }
+
+    public List<TSDataType> getDataTypes() {
+      return Arrays.stream(partialResultColumns)
+          .sequential()
+          .map(Column::getDataType)
+          .collect(Collectors.toList());
+    }
+
+    public Column[] opposite() {
+      List<TSDataType> dataTypes = getDataTypes();
+      TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(dataTypes);
+      ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+      Column[] results = new Column[partialResultColumns.length];
+      for (int i = 0; i < partialResultColumns.length; i++) {
+        switch (dataTypes.get(i)) {
+          case INT32:
+            columnBuilders[i].writeInt(partialResultColumns[i].getInt(0) * -1);
+            break;
+          case INT64:
+            columnBuilders[i].writeLong(partialResultColumns[i].getLong(0) * 
-1);
+            break;
+          case FLOAT:
+            columnBuilders[i].writeFloat(partialResultColumns[i].getFloat(0) * 
-1);
+            break;
+          case DOUBLE:
+            columnBuilders[i].writeDouble(partialResultColumns[i].getDouble(0) 
* -1);
+            break;
+          case TEXT:
+          case BOOLEAN:
+            throw new UnSupportedDataTypeException(
+                String.format("Unsupported data type in opposite : %s", 
dataTypes.get(i)));
+          default:
+            throw new IllegalArgumentException("Unknown data type: " + 
dataTypes.get(i));
+        }
+        results[i] = columnBuilders[i].build();
+      }
+      return results;
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
new file mode 100644
index 0000000000..a6dc525717
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation.slidingwindow;
+
+import org.apache.iotdb.db.mpp.aggregation.Accumulator;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SlidingWindowAggregatorFactory {
+
+  /** comparators used for MonotonicQueueSlidingWindowAggregator */
+  private static final Map<TSDataType, Comparator<Column>> maxComparators = 
new HashMap<>();
+
+  private static final Map<TSDataType, Comparator<Column>> minComparators = 
new HashMap<>();
+  private static final Map<TSDataType, Comparator<Column>> extremeComparators 
= new HashMap<>();
+
+  static {
+    // return a value greater than 0 if o1 is numerically greater than o2
+    maxComparators.put(TSDataType.INT32, Comparator.comparingInt(o -> 
o.getInt(0)));
+    maxComparators.put(TSDataType.INT64, Comparator.comparingLong(o -> 
o.getLong(0)));
+    maxComparators.put(TSDataType.FLOAT, Comparator.comparing(o -> 
o.getFloat(0)));
+    maxComparators.put(TSDataType.DOUBLE, Comparator.comparingDouble(o -> 
o.getDouble(0)));
+
+    // return a value greater than 0 if o1 is numerically less than o2
+    minComparators.put(TSDataType.INT32, (o1, o2) -> 
Integer.compare(o2.getInt(0), o1.getInt(0)));
+    minComparators.put(TSDataType.INT64, (o1, o2) -> 
Long.compare(o2.getLong(0), o1.getLong(0)));
+    minComparators.put(TSDataType.FLOAT, (o1, o2) -> 
Float.compare(o2.getFloat(0), o1.getFloat(0)));
+    minComparators.put(
+        TSDataType.DOUBLE, (o1, o2) -> Double.compare(o2.getDouble(0), 
o1.getDouble(0)));
+
+    // return a value greater than 0 if abs(o1) is numerically greater than 
abs(o2)
+    // if abs(o1) == abs(o2), return a value greater than 0 if o1 is 
numerically greater than o2
+    extremeComparators.put(
+        TSDataType.INT32,
+        (o1, o2) -> {
+          int value1 = o1.getInt(0);
+          int value2 = o2.getInt(0);
+          if (Math.abs(value1) > Math.abs(value2)
+              || (Math.abs(value1) == Math.abs(value2) && value1 > value2)) {
+            return 1;
+          } else if (value1 == value2) {
+            return 0;
+          }
+          return -1;
+        });
+    extremeComparators.put(
+        TSDataType.INT64,
+        (o1, o2) -> {
+          long value1 = o1.getLong(0);
+          long value2 = o2.getLong(0);
+          if (Math.abs(value1) > Math.abs(value2)
+              || (Math.abs(value1) == Math.abs(value2) && value1 > value2)) {
+            return 1;
+          } else if (value1 == value2) {
+            return 0;
+          }
+          return -1;
+        });
+    extremeComparators.put(
+        TSDataType.FLOAT,
+        (o1, o2) -> {
+          float value1 = o1.getFloat(0);
+          float value2 = o2.getFloat(0);
+          if (Math.abs(value1) > Math.abs(value2)
+              || (Math.abs(value1) == Math.abs(value2) && value1 > value2)) {
+            return 1;
+          } else if (value1 == value2) {
+            return 0;
+          }
+          return -1;
+        });
+    extremeComparators.put(
+        TSDataType.DOUBLE,
+        (o1, o2) -> {
+          double value1 = o1.getDouble(0);
+          double value2 = o2.getDouble(0);
+          if (Math.abs(value1) > Math.abs(value2)
+              || (Math.abs(value1) == Math.abs(value2) && value1 > value2)) {
+            return 1;
+          } else if (value1 == value2) {
+            return 0;
+          }
+          return -1;
+        });
+  }
+
+  public static SlidingWindowAggregator createSlidingWindowAggregator(
+      AggregationType aggregationType,
+      TSDataType dataType,
+      boolean ascending,
+      List<InputLocation[]> inputLocationList,
+      AggregationStep step) {
+    Accumulator accumulator =
+        AccumulatorFactory.createAccumulator(aggregationType, dataType, 
ascending);
+    switch (aggregationType) {
+      case SUM:
+      case AVG:
+      case COUNT:
+        return new SmoothQueueSlidingWindowAggregator(accumulator, 
inputLocationList, step);
+      case MAX_VALUE:
+        return new MonotonicQueueSlidingWindowAggregator(
+            accumulator, inputLocationList, step, 
maxComparators.get(dataType));
+      case MIN_VALUE:
+        return new MonotonicQueueSlidingWindowAggregator(
+            accumulator, inputLocationList, step, 
minComparators.get(dataType));
+      case EXTREME:
+        return new MonotonicQueueSlidingWindowAggregator(
+            accumulator, inputLocationList, step, 
extremeComparators.get(dataType));
+      case MIN_TIME:
+      case FIRST_VALUE:
+        return !ascending
+            ? new EmptyQueueSlidingWindowAggregator(accumulator, 
inputLocationList, step)
+            : new NormalQueueSlidingWindowAggregator(accumulator, 
inputLocationList, step);
+      case MAX_TIME:
+      case LAST_VALUE:
+        return !ascending
+            ? new NormalQueueSlidingWindowAggregator(accumulator, 
inputLocationList, step)
+            : new EmptyQueueSlidingWindowAggregator(accumulator, 
inputLocationList, step);
+      default:
+        throw new IllegalArgumentException("Invalid Aggregation Type: " + 
aggregationType);
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SmoothQueueSlidingWindowAggregator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SmoothQueueSlidingWindowAggregator.java
new file mode 100644
index 0000000000..0d82e0169d
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SmoothQueueSlidingWindowAggregator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation.slidingwindow;
+
+import org.apache.iotdb.db.mpp.aggregation.Accumulator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+
+import java.util.List;
+
+/**
+ * The aggregation result is calculated from all pre-aggregation results in 
the currently maintained
+ * queue when calculating the COUNT, SUM, and AVG.
+ */
+public class SmoothQueueSlidingWindowAggregator extends 
SlidingWindowAggregator {
+  public SmoothQueueSlidingWindowAggregator(
+      Accumulator accumulator, List<InputLocation[]> inputLocationList, 
AggregationStep step) {
+    super(accumulator, inputLocationList, step);
+  }
+
+  @Override
+  protected void evictingExpiredValue() {
+    if (!deque.isEmpty() && !curTimeRange.contains(deque.getLast().getTime())) 
{
+      this.accumulator.reset();
+      return;
+    }
+    while (!deque.isEmpty() && 
!curTimeRange.contains(deque.getFirst().getTime())) {
+      PartialAggregationResult partialResult = deque.removeFirst();
+      this.accumulator.addIntermediate(partialResult.opposite());
+    }
+  }
+
+  @Override
+  public void processPartialResult(PartialAggregationResult partialResult) {
+    if (!partialResult.isNull()) {
+      deque.addLast(partialResult);
+      this.accumulator.addIntermediate(partialResult.getPartialResult());
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 23ba0d1b57..3b5beb32eb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -74,7 +74,7 @@ public class AggregationOperator implements ProcessOperator {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
     tsBlockBuilder = new TsBlockBuilder(dataTypes);
-    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, 
ascending);
+    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, 
ascending, true);
   }
 
   @Override
@@ -128,7 +128,7 @@ public class AggregationOperator implements ProcessOperator 
{
 
   public static TsBlock updateResultTsBlockFromAggregators(
       TsBlockBuilder tsBlockBuilder,
-      List<Aggregator> aggregators,
+      List<? extends Aggregator> aggregators,
       ITimeRangeIterator timeRangeIterator) {
     tsBlockBuilder.reset();
     TimeColumnBuilder timeColumnBuilder = 
tsBlockBuilder.getTimeColumnBuilder();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index fa724b12ab..3753635e7f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -79,7 +79,7 @@ public class RawDataAggregationOperator implements 
ProcessOperator {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
     tsBlockBuilder = new TsBlockBuilder(dataTypes);
-    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, 
ascending);
+    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, 
ascending, true);
   }
 
   @Override
@@ -98,7 +98,7 @@ public class RawDataAggregationOperator implements 
ProcessOperator {
     curTimeRange = timeRangeIterator.nextTimeRange();
     for (Aggregator aggregator : aggregators) {
       aggregator.reset();
-      aggregator.setTimeRange(curTimeRange);
+      aggregator.updateTimeRange(curTimeRange);
     }
 
     // 2. Calculate aggregation result based on current time window
@@ -170,7 +170,7 @@ public class RawDataAggregationOperator implements 
ProcessOperator {
     return tsBlock.subTsBlock(tsBlockIterator.getRowIndex());
   }
 
-  private boolean satisfied(TsBlock tsBlock, TimeRange timeRange, boolean 
ascending) {
+  public static boolean satisfied(TsBlock tsBlock, TimeRange timeRange, 
boolean ascending) {
     TsBlockSingleColumnIterator tsBlockIterator = 
tsBlock.getTsBlockSingleColumnIterator();
     if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
       return false;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
new file mode 100644
index 0000000000..150699ccb3
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import 
org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregator;
+import 
org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator.updateResultTsBlockFromAggregators;
+import static 
org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator.satisfied;
+import static 
org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator.skipOutOfTimeRangePoints;
+import static 
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator.initTimeRangeIterator;
+
+public class SlidingWindowAggregationOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  private final Operator child;
+
+  private TsBlock cachedTsBlock;
+
+  private final List<SlidingWindowAggregator> aggregators;
+
+  private final ITimeRangeIterator timeRangeIterator;
+
+  private final boolean ascending;
+
+  private final TsBlockBuilder tsBlockBuilder;
+
+  public SlidingWindowAggregationOperator(
+      OperatorContext operatorContext,
+      List<SlidingWindowAggregator> aggregators,
+      Operator child,
+      boolean ascending,
+      GroupByTimeParameter groupByTimeParameter) {
+    checkArgument(
+        groupByTimeParameter != null,
+        "GroupByTimeParameter cannot be null in 
SlidingWindowAggregationOperator");
+
+    this.operatorContext = operatorContext;
+    this.aggregators = aggregators;
+    this.child = child;
+    List<TSDataType> outputDataTypes = new ArrayList<>();
+    for (Aggregator aggregator : aggregators) {
+      outputDataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
+    }
+    this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, 
ascending, false);
+    this.ascending = ascending;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return timeRangeIterator.hasNextTimeRange();
+  }
+
+  @Override
+  public TsBlock next() {
+    // 1. Clear previous aggregation result
+    TimeRange curTimeRange = timeRangeIterator.nextTimeRange();
+    for (SlidingWindowAggregator aggregator : aggregators) {
+      aggregator.updateTimeRange(curTimeRange);
+    }
+
+    // 2. Calculate aggregation result based on current time window
+    while (!calcFromTsBlock(cachedTsBlock, curTimeRange)) {
+      if (child.hasNext()) {
+        cachedTsBlock = child.next();
+      } else {
+        cachedTsBlock = null;
+        break;
+      }
+    }
+
+    // 3. Update result using aggregators
+    return updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, 
timeRangeIterator);
+  }
+
+  private boolean calcFromTsBlock(TsBlock inputTsBlock, TimeRange timeRange) {
+    // check if the batchData does not contain points in current interval
+    if (inputTsBlock != null && satisfied(inputTsBlock, timeRange, ascending)) 
{
+      // skip points that cannot be calculated
+      inputTsBlock = skipOutOfTimeRangePoints(inputTsBlock, timeRange, 
ascending);
+      for (SlidingWindowAggregator aggregator : aggregators) {
+        aggregator.processTsBlock(inputTsBlock);
+      }
+    }
+    // The result is calculated from the cache
+    return inputTsBlock != null
+        && (ascending
+            ? inputTsBlock.getEndTime() > timeRange.getMax()
+            : inputTsBlock.getEndTime() < timeRange.getMin());
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return !this.hasNext();
+  }
+
+  @Override
+  public void close() throws Exception {
+    child.close();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
index 9ef84c165d..5187384560 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
@@ -94,7 +94,7 @@ public class AlignedSeriesAggregationScanOperator implements 
DataSourceOperator
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
     tsBlockBuilder = new TsBlockBuilder(dataTypes);
-    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, 
ascending);
+    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, 
ascending, true);
   }
 
   @Override
@@ -125,7 +125,7 @@ public class AlignedSeriesAggregationScanOperator 
implements DataSourceOperator
       // 1. Clear previous aggregation result
       for (Aggregator aggregator : aggregators) {
         aggregator.reset();
-        aggregator.setTimeRange(curTimeRange);
+        aggregator.updateTimeRange(curTimeRange);
       }
 
       // 2. Calculate aggregation result based on current time window
@@ -315,11 +315,9 @@ public class AlignedSeriesAggregationScanOperator 
implements DataSourceOperator
       calcFromBatch(tsBlock, curTimeRange);
 
       // judge whether the calculation finished
-      if (isEndCalc(aggregators)
-          || (tsBlockIterator.hasNext()
-              && (ascending
-                  ? tsBlockIterator.currentTime() > curTimeRange.getMax()
-                  : tsBlockIterator.currentTime() < curTimeRange.getMin()))) {
+      if (isEndCalc(aggregators) || ascending
+          ? tsBlock.getEndTime() > curTimeRange.getMax()
+          : tsBlock.getEndTime() < curTimeRange.getMin()) {
         return true;
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
index c234a7c2ef..807b1a2ebd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
@@ -101,7 +101,7 @@ public class SeriesAggregationScanOperator implements 
DataSourceOperator {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
     tsBlockBuilder = new TsBlockBuilder(dataTypes);
-    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, 
ascending);
+    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, 
ascending, true);
   }
 
   /**
@@ -110,7 +110,7 @@ public class SeriesAggregationScanOperator implements 
DataSourceOperator {
    * timestamp, so it doesn't matter what the time range returns.
    */
   public static ITimeRangeIterator initTimeRangeIterator(
-      GroupByTimeParameter groupByTimeParameter, boolean ascending) {
+      GroupByTimeParameter groupByTimeParameter, boolean ascending, boolean 
isPreAggr) {
     if (groupByTimeParameter == null) {
       return new SingleTimeWindowIterator(0, Long.MAX_VALUE);
     } else {
@@ -123,7 +123,7 @@ public class SeriesAggregationScanOperator implements 
DataSourceOperator {
           groupByTimeParameter.isIntervalByMonth(),
           groupByTimeParameter.isSlidingStepByMonth(),
           groupByTimeParameter.isLeftCRightO(),
-          groupByTimeParameter.getInterval() > 
groupByTimeParameter.getSlidingStep());
+          isPreAggr);
     }
   }
 
@@ -155,7 +155,7 @@ public class SeriesAggregationScanOperator implements 
DataSourceOperator {
       // 1. Clear previous aggregation result
       for (Aggregator aggregator : aggregators) {
         aggregator.reset();
-        aggregator.setTimeRange(curTimeRange);
+        aggregator.updateTimeRange(curTimeRange);
       }
 
       // 2. Calculate aggregation result based on current time window
@@ -335,11 +335,9 @@ public class SeriesAggregationScanOperator implements 
DataSourceOperator {
       calcFromBatch(tsBlock, curTimeRange);
 
       // judge whether the calculation finished
-      if (isEndCalc(aggregators)
-          || (tsBlockIterator.hasNext()
-              && (ascending
-                  ? tsBlockIterator.currentTime() > curTimeRange.getMax()
-                  : tsBlockIterator.currentTime() < curTimeRange.getMin()))) {
+      if (isEndCalc(aggregators) || ascending
+          ? tsBlock.getEndTime() > curTimeRange.getMax()
+          : tsBlock.getEndTime() < curTimeRange.getMin()) {
         return true;
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 9e7d99cae9..26d9306095 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import 
org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregator;
+import 
org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregatorFactory;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockManager;
 import org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockService;
@@ -51,6 +53,7 @@ import 
org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
@@ -127,6 +130,7 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
@@ -784,6 +788,38 @@ public class LocalExecutionPlanner {
       return super.visitGroupByLevel(node, context);
     }
 
+    @Override
+    public Operator visitSlidingWindowAggregation(
+        SlidingWindowAggregationNode node, LocalExecutionPlanContext context) {
+      checkArgument(
+          node.getAggregationDescriptorList().size() >= 1,
+          "Aggregation descriptorList cannot be empty");
+      OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              SlidingWindowAggregationOperator.class.getSimpleName());
+      Operator child = node.getChild().accept(this, context);
+      boolean ascending = node.getScanOrder() == OrderBy.TIMESTAMP_ASC;
+      List<SlidingWindowAggregator> aggregators = new ArrayList<>();
+      Map<String, List<InputLocation>> layout = makeLayout(node);
+      for (AggregationDescriptor descriptor : 
node.getAggregationDescriptorList()) {
+        List<InputLocation[]> inputLocationList = 
calcInputLocationList(descriptor, layout);
+        aggregators.add(
+            SlidingWindowAggregatorFactory.createSlidingWindowAggregator(
+                descriptor.getAggregationType(),
+                context
+                    .getTypeProvider()
+                    // get the type of first inputExpression
+                    
.getType(descriptor.getInputExpressions().get(0).toString()),
+                ascending,
+                inputLocationList,
+                descriptor.getStep()));
+      }
+      return new SlidingWindowAggregationOperator(
+          operatorContext, aggregators, child, ascending, 
node.getGroupByTimeParameter());
+    }
+
     @Override
     public Operator visitLimit(LimitNode node, LocalExecutionPlanContext 
context) {
       Operator child = node.getChild().accept(this, context);
@@ -822,23 +858,7 @@ public class LocalExecutionPlanner {
       List<Aggregator> aggregators = new ArrayList<>();
       Map<String, List<InputLocation>> layout = makeLayout(node);
       for (AggregationDescriptor descriptor : 
node.getAggregationDescriptorList()) {
-        List<String> inputColumnNames = descriptor.getInputColumnNames();
-        // it may include double parts
-        List<List<InputLocation>> inputLocationParts = new 
ArrayList<>(inputColumnNames.size());
-        inputColumnNames.forEach(o -> inputLocationParts.add(layout.get(o)));
-
-        List<InputLocation[]> inputLocationList = new ArrayList<>();
-        for (int i = 0; i < inputLocationParts.get(0).size(); i++) {
-          if (inputColumnNames.size() == 1) {
-            inputLocationList.add(new InputLocation[] 
{inputLocationParts.get(0).get(i)});
-          } else {
-            inputLocationList.add(
-                new InputLocation[] {
-                  inputLocationParts.get(0).get(i), 
inputLocationParts.get(1).get(i)
-                });
-          }
-        }
-
+        List<InputLocation[]> inputLocationList = 
calcInputLocationList(descriptor, layout);
         aggregators.add(
             new Aggregator(
                 AccumulatorFactory.createAccumulator(
@@ -876,6 +896,27 @@ public class LocalExecutionPlanner {
       }
     }
 
+    private List<InputLocation[]> calcInputLocationList(
+        AggregationDescriptor descriptor, Map<String, List<InputLocation>> 
layout) {
+      List<String> inputColumnNames = descriptor.getInputColumnNames();
+      // it may include double parts
+      List<List<InputLocation>> inputLocationParts = new 
ArrayList<>(inputColumnNames.size());
+      inputColumnNames.forEach(o -> inputLocationParts.add(layout.get(o)));
+
+      List<InputLocation[]> inputLocationList = new ArrayList<>();
+      for (int i = 0; i < inputLocationParts.get(0).size(); i++) {
+        if (inputColumnNames.size() == 1) {
+          inputLocationList.add(new InputLocation[] 
{inputLocationParts.get(0).get(i)});
+        } else {
+          inputLocationList.add(
+              new InputLocation[] {
+                inputLocationParts.get(0).get(i), 
inputLocationParts.get(1).get(i)
+              });
+        }
+      }
+      return inputLocationList;
+    }
+
     @Override
     public Operator visitSort(SortNode node, LocalExecutionPlanContext 
context) {
       return super.visitSort(node, context);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index d60d210118..a22371f711 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -53,10 +53,10 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTimeNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
@@ -175,6 +175,7 @@ public class LogicalPlanBuilder {
             : AggregationStep.SINGLE;
 
     List<PlanNode> sourceNodeList = new ArrayList<>();
+    boolean needCheckAscending = groupByTimeParameter == null;
     Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new 
HashMap<>();
     Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new 
HashMap<>();
     for (Expression sourceExpression : sourceExpressions) {
@@ -189,7 +190,8 @@ public class LogicalPlanBuilder {
       }
       PartialPath selectPath =
           ((TimeSeriesOperand) 
sourceExpression.getExpressions().get(0)).getPath();
-      if (SchemaUtils.isConsistentWithScanOrder(aggregationFunction, 
scanOrder)) {
+      if (!needCheckAscending
+          || SchemaUtils.isConsistentWithScanOrder(aggregationFunction, 
scanOrder)) {
         ascendingAggregations
             .computeIfAbsent(selectPath, key -> new ArrayList<>())
             .add(aggregationDescriptor);
@@ -202,8 +204,6 @@ public class LogicalPlanBuilder {
 
     Map<PartialPath, List<AggregationDescriptor>> groupedAscendingAggregations 
=
         MetaUtils.groupAlignedAggregations(ascendingAggregations);
-    Map<PartialPath, List<AggregationDescriptor>> 
groupedDescendingAggregations =
-        MetaUtils.groupAlignedAggregations(descendingAggregations);
     for (Map.Entry<PartialPath, List<AggregationDescriptor>> 
pathAggregationsEntry :
         groupedAscendingAggregations.entrySet()) {
       sourceNodeList.add(
@@ -214,29 +214,38 @@ public class LogicalPlanBuilder {
               groupByTimeParameter,
               timeFilter));
     }
-    for (Map.Entry<PartialPath, List<AggregationDescriptor>> 
pathAggregationsEntry :
-        groupedDescendingAggregations.entrySet()) {
-      sourceNodeList.add(
-          createAggregationScanNode(
-              pathAggregationsEntry.getKey(),
-              pathAggregationsEntry.getValue(),
-              scanOrder,
-              groupByTimeParameter,
-              timeFilter));
+
+    if (needCheckAscending) {
+      Map<PartialPath, List<AggregationDescriptor>> 
groupedDescendingAggregations =
+          MetaUtils.groupAlignedAggregations(descendingAggregations);
+      for (Map.Entry<PartialPath, List<AggregationDescriptor>> 
pathAggregationsEntry :
+          groupedDescendingAggregations.entrySet()) {
+        sourceNodeList.add(
+            createAggregationScanNode(
+                pathAggregationsEntry.getKey(),
+                pathAggregationsEntry.getValue(),
+                scanOrder,
+                groupByTimeParameter,
+                timeFilter));
+      }
     }
 
     if (curStep.isOutputPartial()) {
       if (groupByTimeParameter != null && groupByTimeParameter.hasOverlap()) {
         curStep =
             groupByLevelExpressions != null ? AggregationStep.INTERMEDIATE : 
AggregationStep.FINAL;
+
+        this.root = convergeWithTimeJoin(sourceNodeList, scanOrder);
+
         this.root =
-            createGroupByTimeNode(
-                sourceNodeList, aggregationExpressions, groupByTimeParameter, 
curStep);
+            createSlidingWindowAggregationNode(
+                this.getRoot(), aggregationExpressions, groupByTimeParameter, 
curStep, scanOrder);
 
         if (groupByLevelExpressions != null) {
           curStep = AggregationStep.FINAL;
           this.root =
-              createGroupByTLevelNode(this.root.getChildren(), 
groupByLevelExpressions, curStep);
+              createGroupByTLevelNode(
+                  Collections.singletonList(this.getRoot()), 
groupByLevelExpressions, curStep);
         }
       } else {
         if (groupByLevelExpressions != null) {
@@ -311,7 +320,8 @@ public class LogicalPlanBuilder {
       Set<Expression> aggregationExpressions,
       GroupByTimeParameter groupByTimeParameter,
       AggregationStep curStep,
-      TypeProvider typeProvider) {
+      TypeProvider typeProvider,
+      OrderBy scanOrder) {
     if (aggregationExpressions == null) {
       return this;
     }
@@ -329,39 +339,40 @@ public class LogicalPlanBuilder {
             context.getQueryId().genPlanNodeId(),
             Collections.singletonList(this.getRoot()),
             aggregationDescriptorList,
-            groupByTimeParameter);
+            groupByTimeParameter,
+            scanOrder);
     return this;
   }
 
-  public LogicalPlanBuilder planGroupByTime(
+  public LogicalPlanBuilder planSlidingWindowAggregation(
       Set<Expression> aggregationExpressions,
       GroupByTimeParameter groupByTimeParameter,
-      AggregationStep curStep) {
+      AggregationStep curStep,
+      OrderBy scanOrder) {
     if (aggregationExpressions == null) {
       return this;
     }
 
     this.root =
-        createGroupByTimeNode(
-            Collections.singletonList(this.getRoot()),
-            aggregationExpressions,
-            groupByTimeParameter,
-            curStep);
+        createSlidingWindowAggregationNode(
+            this.getRoot(), aggregationExpressions, groupByTimeParameter, 
curStep, scanOrder);
     return this;
   }
 
-  private PlanNode createGroupByTimeNode(
-      List<PlanNode> children,
+  private PlanNode createSlidingWindowAggregationNode(
+      PlanNode child,
       Set<Expression> aggregationExpressions,
       GroupByTimeParameter groupByTimeParameter,
-      AggregationStep curStep) {
+      AggregationStep curStep,
+      OrderBy scanOrder) {
     List<AggregationDescriptor> aggregationDescriptorList =
         constructAggregationDescriptorList(aggregationExpressions, curStep);
-    return new GroupByTimeNode(
+    return new SlidingWindowAggregationNode(
         context.getQueryId().genPlanNodeId(),
-        children,
+        child,
         aggregationDescriptorList,
-        groupByTimeParameter);
+        groupByTimeParameter,
+        scanOrder);
   }
 
   private PlanNode createGroupByTLevelNode(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index 0e6da8c705..4950a6427f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -221,7 +221,8 @@ public class LogicalPlanner {
                   aggregationExpressions,
                   analysis.getGroupByTimeParameter(),
                   curStep,
-                  analysis.getTypeProvider());
+                  analysis.getTypeProvider(),
+                  queryStatement.getResultOrder());
 
           if (curStep.isOutputPartial()) {
             if (queryStatement.isGroupByTime() && 
analysis.getGroupByTimeParameter().hasOverlap()) {
@@ -230,8 +231,11 @@ public class LogicalPlanner {
                       ? AggregationStep.INTERMEDIATE
                       : AggregationStep.FINAL;
               planBuilder =
-                  planBuilder.planGroupByTime(
-                      aggregationExpressions, 
analysis.getGroupByTimeParameter(), curStep);
+                  planBuilder.planSlidingWindowAggregation(
+                      aggregationExpressions,
+                      analysis.getGroupByTimeParameter(),
+                      curStep,
+                      queryStatement.getResultOrder());
             }
 
             if (queryStatement.isGroupByLevel()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index c485ad136c..5d344f1f95 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -44,11 +44,11 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTimeNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
@@ -102,7 +102,7 @@ public enum PlanNodeType {
   TIME_SERIES_COUNT((short) 28),
   LEVEL_TIME_SERIES_COUNT((short) 29),
   COUNT_MERGE((short) 30),
-  GROUP_BY_TIME((short) 31),
+  SLIDING_WINDOW_AGGREGATION((short) 31),
   PROJECT((short) 32),
   ALIGNED_SERIES_SCAN((short) 33),
   ALIGNED_SERIES_AGGREGATE_SCAN((short) 34),
@@ -208,7 +208,7 @@ public enum PlanNodeType {
       case 30:
         return CountSchemaMergeNode.deserialize(buffer);
       case 31:
-        return GroupByTimeNode.deserialize(buffer);
+        return SlidingWindowAggregationNode.deserialize(buffer);
       case 32:
         return ProjectNode.deserialize(buffer);
       case 33:
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 2af623bfbe..5f5e09d228 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -44,11 +44,11 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTimeNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
@@ -114,7 +114,7 @@ public abstract class PlanVisitor<R, C> {
     return visitPlan(node, context);
   }
 
-  public R visitGroupByTime(GroupByTimeNode node, C context) {
+  public R visitSlidingWindowAggregation(SlidingWindowAggregationNode node, C 
context) {
     return visitPlan(node, context);
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
index 70e21e3958..bfaf165b36 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
@@ -52,36 +52,33 @@ public class AggregationNode extends MultiChildNode {
   // The parameter of `group by time`.
   // Its value will be null if there is no `group by time` clause.
   @Nullable protected GroupByTimeParameter groupByTimeParameter;
-  protected OrderBy scanOrder = OrderBy.TIMESTAMP_ASC;
 
-  public AggregationNode(
-      PlanNodeId id,
-      List<PlanNode> children,
-      List<AggregationDescriptor> aggregationDescriptorList) {
-    this(id, children, aggregationDescriptorList, null);
-  }
+  protected OrderBy scanOrder;
 
   public AggregationNode(
       PlanNodeId id,
-      List<PlanNode> children,
       List<AggregationDescriptor> aggregationDescriptorList,
-      @Nullable GroupByTimeParameter groupByTimeParameter) {
-    super(id, children);
+      @Nullable GroupByTimeParameter groupByTimeParameter,
+      OrderBy scanOrder) {
+    super(id, new ArrayList<>());
     this.aggregationDescriptorList = 
getDeduplicatedDescriptors(aggregationDescriptorList);
     this.groupByTimeParameter = groupByTimeParameter;
-  }
-
-  public AggregationNode(PlanNodeId id, List<AggregationDescriptor> 
aggregationDescriptorList) {
-    this(id, aggregationDescriptorList, null);
+    this.scanOrder = scanOrder;
   }
 
   public AggregationNode(
       PlanNodeId id,
+      List<PlanNode> children,
       List<AggregationDescriptor> aggregationDescriptorList,
-      @Nullable GroupByTimeParameter groupByTimeParameter) {
-    super(id, new ArrayList<>());
-    this.aggregationDescriptorList = 
getDeduplicatedDescriptors(aggregationDescriptorList);
-    this.groupByTimeParameter = groupByTimeParameter;
+      @Nullable GroupByTimeParameter groupByTimeParameter,
+      OrderBy scanOrder) {
+    this(id, aggregationDescriptorList, groupByTimeParameter, scanOrder);
+    this.children = children;
+  }
+
+  @Deprecated
+  public AggregationNode(PlanNodeId id, List<AggregationDescriptor> 
aggregationDescriptorList) {
+    this(id, aggregationDescriptorList, null, OrderBy.TIMESTAMP_ASC);
   }
 
   public List<AggregationDescriptor> getAggregationDescriptorList() {
@@ -115,7 +112,7 @@ public class AggregationNode extends MultiChildNode {
   @Override
   public PlanNode clone() {
     return new AggregationNode(
-        getPlanNodeId(), getAggregationDescriptorList(), 
getGroupByTimeParameter());
+        getPlanNodeId(), getAggregationDescriptorList(), 
getGroupByTimeParameter(), getScanOrder());
   }
 
   @Override
@@ -148,6 +145,7 @@ public class AggregationNode extends MultiChildNode {
       ReadWriteIOUtils.write((byte) 1, byteBuffer);
       groupByTimeParameter.serialize(byteBuffer);
     }
+    ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
   }
 
   public static AggregationNode deserialize(ByteBuffer byteBuffer) {
@@ -162,8 +160,10 @@ public class AggregationNode extends MultiChildNode {
     if (isNull == 1) {
       groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
     }
+    OrderBy scanOrder = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new AggregationNode(planNodeId, aggregationDescriptorList, 
groupByTimeParameter);
+    return new AggregationNode(
+        planNodeId, aggregationDescriptorList, groupByTimeParameter, 
scanOrder);
   }
 
   @Override
@@ -178,15 +178,15 @@ public class AggregationNode extends MultiChildNode {
       return false;
     }
     AggregationNode that = (AggregationNode) o;
-    return aggregationDescriptorList.equals(that.aggregationDescriptorList)
+    return Objects.equals(aggregationDescriptorList, 
that.aggregationDescriptorList)
         && Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
-        && Objects.equals(children, that.children);
+        && scanOrder == that.scanOrder;
   }
 
   @Override
   public int hashCode() {
     return Objects.hash(
-        super.hashCode(), aggregationDescriptorList, groupByTimeParameter, 
children);
+        super.hashCode(), aggregationDescriptorList, groupByTimeParameter, 
scanOrder);
   }
 
   /**
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTimeNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
similarity index 72%
rename from 
server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTimeNode.java
rename to 
server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
index 36a5bc2b9b..d0df526b93 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTimeNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
@@ -25,9 +25,10 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import javax.annotation.Nullable;
+import com.google.common.collect.ImmutableList;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -35,65 +36,75 @@ import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-public class GroupByTimeNode extends ProcessNode {
+public class SlidingWindowAggregationNode extends ProcessNode {
 
   // The list of aggregate functions, each AggregateDescriptor will be output 
as one column of
   // result TsBlock
   private final List<AggregationDescriptor> aggregationDescriptorList;
 
   // The parameter of `group by time`.
-  // Its value will be null if there is no `group by time` clause.
-  @Nullable private final GroupByTimeParameter groupByTimeParameter;
+  private final GroupByTimeParameter groupByTimeParameter;
 
-  private List<PlanNode> children;
+  protected OrderBy scanOrder = OrderBy.TIMESTAMP_ASC;
 
-  public GroupByTimeNode(
+  private PlanNode child;
+
+  public SlidingWindowAggregationNode(
       PlanNodeId id,
       List<AggregationDescriptor> aggregationDescriptorList,
-      @Nullable GroupByTimeParameter groupByTimeParameter) {
+      GroupByTimeParameter groupByTimeParameter,
+      OrderBy scanOrder) {
     super(id);
     this.aggregationDescriptorList = aggregationDescriptorList;
     this.groupByTimeParameter = groupByTimeParameter;
-    this.children = new ArrayList<>();
+    this.scanOrder = scanOrder;
   }
 
-  public GroupByTimeNode(
+  public SlidingWindowAggregationNode(
       PlanNodeId id,
-      List<PlanNode> children,
+      PlanNode child,
       List<AggregationDescriptor> aggregationDescriptorList,
-      GroupByTimeParameter groupByTimeParameter) {
-    this(id, aggregationDescriptorList, groupByTimeParameter);
-    this.children = children;
+      GroupByTimeParameter groupByTimeParameter,
+      OrderBy scanOrder) {
+    this(id, aggregationDescriptorList, groupByTimeParameter, scanOrder);
+    this.child = child;
   }
 
   public List<AggregationDescriptor> getAggregationDescriptorList() {
     return aggregationDescriptorList;
   }
 
-  @Nullable
   public GroupByTimeParameter getGroupByTimeParameter() {
     return groupByTimeParameter;
   }
 
+  public OrderBy getScanOrder() {
+    return scanOrder;
+  }
+
+  public PlanNode getChild() {
+    return child;
+  }
+
   @Override
   public List<PlanNode> getChildren() {
-    return children;
+    return ImmutableList.of(child);
   }
 
   @Override
   public void addChild(PlanNode child) {
-    this.children.add(child);
+    this.child = child;
   }
 
   @Override
   public int allowedChildCount() {
-    return CHILD_COUNT_NO_LIMIT;
+    return ONE_CHILD;
   }
 
   @Override
   public PlanNode clone() {
-    return new GroupByTimeNode(
-        getPlanNodeId(), getAggregationDescriptorList(), 
getGroupByTimeParameter());
+    return new SlidingWindowAggregationNode(
+        getPlanNodeId(), getAggregationDescriptorList(), 
getGroupByTimeParameter(), getScanOrder());
   }
 
   @Override
@@ -106,12 +117,12 @@ public class GroupByTimeNode extends ProcessNode {
 
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-    return visitor.visitGroupByTime(this, context);
+    return visitor.visitSlidingWindowAggregation(this, context);
   }
 
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
-    PlanNodeType.GROUP_BY_TIME.serialize(byteBuffer);
+    PlanNodeType.SLIDING_WINDOW_AGGREGATION.serialize(byteBuffer);
     ReadWriteIOUtils.write(aggregationDescriptorList.size(), byteBuffer);
     for (AggregationDescriptor aggregationDescriptor : 
aggregationDescriptorList) {
       aggregationDescriptor.serialize(byteBuffer);
@@ -122,9 +133,10 @@ public class GroupByTimeNode extends ProcessNode {
       ReadWriteIOUtils.write((byte) 1, byteBuffer);
       groupByTimeParameter.serialize(byteBuffer);
     }
+    ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
   }
 
-  public static GroupByTimeNode deserialize(ByteBuffer byteBuffer) {
+  public static SlidingWindowAggregationNode deserialize(ByteBuffer 
byteBuffer) {
     int descriptorSize = ReadWriteIOUtils.readInt(byteBuffer);
     List<AggregationDescriptor> aggregationDescriptorList = new ArrayList<>();
     while (descriptorSize > 0) {
@@ -136,8 +148,10 @@ public class GroupByTimeNode extends ProcessNode {
     if (isNull == 1) {
       groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
     }
+    OrderBy scanOrder = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new GroupByTimeNode(planNodeId, aggregationDescriptorList, 
groupByTimeParameter);
+    return new SlidingWindowAggregationNode(
+        planNodeId, aggregationDescriptorList, groupByTimeParameter, 
scanOrder);
   }
 
   @Override
@@ -151,15 +165,14 @@ public class GroupByTimeNode extends ProcessNode {
     if (!super.equals(o)) {
       return false;
     }
-    GroupByTimeNode that = (GroupByTimeNode) o;
+    SlidingWindowAggregationNode that = (SlidingWindowAggregationNode) o;
     return Objects.equals(aggregationDescriptorList, 
that.aggregationDescriptorList)
         && Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
-        && Objects.equals(children, that.children);
+        && Objects.equals(child, that.child);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(
-        super.hashCode(), aggregationDescriptorList, groupByTimeParameter, 
children);
+    return Objects.hash(super.hashCode(), aggregationDescriptorList, 
groupByTimeParameter, child);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java 
b/server/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 476ebbfb41..1a82376149 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.exception.StorageGroupNotReadyException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
 import org.apache.iotdb.db.exception.sql.SQLParserException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.exception.TsFileRuntimeException;
@@ -119,6 +120,8 @@ public class ErrorHandlingUtils {
       return RpcUtils.getStatus(((IoTDBException) t).getErrorCode(), 
rootCause.getMessage());
     } else if (t instanceof TsFileRuntimeException) {
       return RpcUtils.getStatus(TSStatusCode.TSFILE_PROCESSOR_ERROR, 
rootCause.getMessage());
+    } else if (t instanceof SemanticException) {
+      return RpcUtils.getStatus(TSStatusCode.SEMANTIC_ERROR, 
rootCause.getMessage());
     }
     return null;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java 
b/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
index ecd646ca6e..e0b8f0f020 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.utils;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -109,6 +110,10 @@ public class TypeInferenceUtils {
     if (aggrFuncName == null) {
       throw new IllegalArgumentException("AggregateFunction Name must not be 
null");
     }
+    if (!verifyIsAggregationDataTypeMatched(aggrFuncName, dataType)) {
+      throw new SemanticException(
+          "Aggregate functions [AVG, SUM, EXTREME, MIN_VALUE, MAX_VALUE] only 
support numeric data types [INT32, INT64, FLOAT, DOUBLE]");
+    }
 
     switch (aggrFuncName.toLowerCase()) {
       case SQLConstant.MIN_TIME:
@@ -128,4 +133,24 @@ public class TypeInferenceUtils {
         throw new IllegalArgumentException("Invalid Aggregation function: " + 
aggrFuncName);
     }
   }
+
+  private static boolean verifyIsAggregationDataTypeMatched(
+      String aggrFuncName, TSDataType dataType) {
+    switch (aggrFuncName.toLowerCase()) {
+      case SQLConstant.AVG:
+      case SQLConstant.SUM:
+      case SQLConstant.EXTREME:
+      case SQLConstant.MIN_VALUE:
+      case SQLConstant.MAX_VALUE:
+        return dataType.isNumeric();
+      case SQLConstant.COUNT:
+      case SQLConstant.MIN_TIME:
+      case SQLConstant.MAX_TIME:
+      case SQLConstant.FIRST_VALUE:
+      case SQLConstant.LAST_VALUE:
+        return true;
+      default:
+        throw new IllegalArgumentException("Invalid Aggregation function: " + 
aggrFuncName);
+    }
+  }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
index eaf58a6f22..35cd0e25bb 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
@@ -68,7 +68,7 @@ public class AggregationOperatorTest {
   private final List<TsFileResource> seqResources = new ArrayList<>();
   private final List<TsFileResource> unSeqResources = new ArrayList<>();
   private ExecutorService instanceNotificationExecutor =
-      IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");;
+      IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
 
   @Before
   public void setUp() throws MetadataException, IOException, 
WriteProcessException {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
new file mode 100644
index 0000000000..ee0deb4df5
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import 
org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregator;
+import 
org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregatorFactory;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.constant.TestConstant.count;
+import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+
+public class SlidingWindowAggregationOperatorTest {
+
+  private static final String AGGREGATION_OPERATOR_TEST_SG =
+      "root.SlidingWindowAggregationOperatorTest";
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+  private ExecutorService instanceNotificationExecutor =
+      IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+
+  private final List<AggregationType> leafAggregationTypes =
+      Arrays.asList(
+          AggregationType.COUNT,
+          AggregationType.SUM,
+          AggregationType.LAST_VALUE,
+          AggregationType.FIRST_VALUE,
+          AggregationType.MAX_VALUE,
+          AggregationType.MIN_VALUE);
+
+  private final List<AggregationType> rootAggregationTypes =
+      Arrays.asList(
+          AggregationType.COUNT,
+          AggregationType.AVG,
+          AggregationType.SUM,
+          AggregationType.LAST_VALUE,
+          AggregationType.MIN_TIME,
+          AggregationType.MAX_TIME,
+          AggregationType.FIRST_VALUE,
+          AggregationType.MAX_VALUE,
+          AggregationType.MIN_VALUE);
+
+  private final List<List<List<InputLocation>>> inputLocations =
+      Arrays.asList(
+          Collections.singletonList(Collections.singletonList(new 
InputLocation(0, 0))),
+          Collections.singletonList(
+              Arrays.asList(new InputLocation(0, 0), new InputLocation(0, 1))),
+          Collections.singletonList(Collections.singletonList(new 
InputLocation(0, 1))),
+          Collections.singletonList(
+              Arrays.asList(new InputLocation(0, 2), new InputLocation(0, 3))),
+          Collections.singletonList(Collections.singletonList(new 
InputLocation(0, 5))),
+          Collections.singletonList(Collections.singletonList(new 
InputLocation(0, 3))),
+          Collections.singletonList(
+              Arrays.asList(new InputLocation(0, 4), new InputLocation(0, 5))),
+          Collections.singletonList(Collections.singletonList(new 
InputLocation(0, 6))),
+          Collections.singletonList(Collections.singletonList(new 
InputLocation(0, 7))));
+
+  private final GroupByTimeParameter groupByTimeParameter =
+      new GroupByTimeParameter(0, 300, 100, 50, true);
+
+  @Before
+  public void setUp() throws MetadataException, IOException, 
WriteProcessException {
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unSeqResources, 
AGGREGATION_OPERATOR_TEST_SG);
+    this.instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+    instanceNotificationExecutor.shutdown();
+  }
+
+  @Test
+  public void slidingWindowAggregationTest() throws IllegalPathException {
+    String[] retArray =
+        new String[] {
+          "0,100,20049.5,2004950.0,20099,0,99,20000,20099,20000",
+          "50,100,20099.5,2009950.0,20149,50,149,20050,20149,20050",
+          "100,100,20149.5,2014950.0,20199,100,199,20100,20199,20100",
+          "150,100,15199.5,1519950.0,10249,150,249,20150,20199,10200",
+          "200,100,6249.5,624950.0,299,200,299,10200,10259,260",
+          "250,50,2274.5,113725.0,299,250,299,10250,10259,260",
+        };
+
+    SlidingWindowAggregationOperator slidingWindowAggregationOperator1 =
+        initSlidingWindowAggregationOperator(true);
+    int count = 0;
+    while (slidingWindowAggregationOperator1.hasNext()) {
+      TsBlock resultTsBlock = slidingWindowAggregationOperator1.next();
+      Assert.assertEquals(rootAggregationTypes.size(), 
resultTsBlock.getValueColumnCount());
+      Assert.assertEquals(retArray[count], getResultString(resultTsBlock));
+      count++;
+    }
+    Assert.assertEquals(retArray.length, count);
+
+    SlidingWindowAggregationOperator slidingWindowAggregationOperator2 =
+        initSlidingWindowAggregationOperator(false);
+    while (slidingWindowAggregationOperator2.hasNext()) {
+      TsBlock resultTsBlock = slidingWindowAggregationOperator2.next();
+      Assert.assertEquals(rootAggregationTypes.size(), 
resultTsBlock.getValueColumnCount());
+      Assert.assertEquals(retArray[count - 1], getResultString(resultTsBlock));
+      count--;
+    }
+    Assert.assertEquals(0, count);
+  }
+
+  private String getResultString(TsBlock resultTsBlock) {
+    return resultTsBlock.getTimeColumn().getLong(0)
+        + ","
+        + resultTsBlock.getColumn(0).getLong(0)
+        + ","
+        + resultTsBlock.getColumn(1).getDouble(0)
+        + ","
+        + resultTsBlock.getColumn(2).getDouble(0)
+        + ","
+        + resultTsBlock.getColumn(3).getInt(0)
+        + ","
+        + resultTsBlock.getColumn(4).getLong(0)
+        + ","
+        + resultTsBlock.getColumn(5).getLong(0)
+        + ","
+        + resultTsBlock.getColumn(6).getInt(0)
+        + ","
+        + resultTsBlock.getColumn(7).getInt(0)
+        + ","
+        + resultTsBlock.getColumn(8).getInt(0);
+  }
+
+  private SlidingWindowAggregationOperator 
initSlidingWindowAggregationOperator(boolean ascending)
+      throws IllegalPathException {
+    // Construct operator tree
+    QueryId queryId = new QueryId("test");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    PlanNodeId sourceId = queryId.genPlanNodeId();
+    fragmentInstanceContext.addOperatorContext(
+        0, sourceId, SeriesAggregationScanOperator.class.getSimpleName());
+    fragmentInstanceContext.addOperatorContext(
+        1, queryId.genPlanNodeId(), 
SlidingWindowAggregationOperator.class.getSimpleName());
+
+    MeasurementPath d0s0 =
+        new MeasurementPath(AGGREGATION_OPERATOR_TEST_SG + ".device0.sensor0", 
TSDataType.INT32);
+
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(leafAggregationTypes, 
TSDataType.INT32, ascending)
+        .forEach(
+            accumulator -> aggregators.add(new Aggregator(accumulator, 
AggregationStep.PARTIAL)));
+
+    SeriesAggregationScanOperator seriesAggregationScanOperator =
+        new SeriesAggregationScanOperator(
+            sourceId,
+            d0s0,
+            Collections.singleton("sensor0"),
+            fragmentInstanceContext.getOperatorContexts().get(0),
+            aggregators,
+            null,
+            ascending,
+            groupByTimeParameter);
+    seriesAggregationScanOperator.initQueryDataSource(
+        new QueryDataSource(seqResources, unSeqResources));
+
+    List<SlidingWindowAggregator> finalAggregators = new ArrayList<>();
+    for (int i = 0; i < rootAggregationTypes.size(); i++) {
+      finalAggregators.add(
+          SlidingWindowAggregatorFactory.createSlidingWindowAggregator(
+              rootAggregationTypes.get(i),
+              TSDataType.INT32,
+              ascending,
+              inputLocations.get(i).stream()
+                  .map(tmpInputLocations -> tmpInputLocations.toArray(new 
InputLocation[0]))
+                  .collect(Collectors.toList()),
+              AggregationStep.FINAL));
+    }
+
+    return new SlidingWindowAggregationOperator(
+        fragmentInstanceContext.getOperatorContexts().get(1),
+        finalAggregators,
+        seriesAggregationScanOperator,
+        ascending,
+        groupByTimeParameter);
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
index e58bf4af47..46f7b07683 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
@@ -828,7 +828,9 @@ public class QueryLogicalPlanUtil {
                     AggregationType.LAST_VALUE,
                     AggregationStep.PARTIAL,
                     Collections.singletonList(
-                        new 
TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))));
+                        new 
TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))),
+            null,
+            OrderBy.TIMESTAMP_ASC);
 
     GroupByLevelNode groupByLevelNode =
         new GroupByLevelNode(
@@ -930,7 +932,9 @@ public class QueryLogicalPlanUtil {
                     AggregationType.LAST_VALUE,
                     AggregationStep.SINGLE,
                     Collections.singletonList(
-                        new 
TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))));
+                        new 
TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))),
+            null,
+            OrderBy.TIMESTAMP_DESC);
 
     List<PlanNode> sourceNodeList2 = new ArrayList<>();
     sourceNodeList2.add(
@@ -986,7 +990,9 @@ public class QueryLogicalPlanUtil {
                     AggregationType.LAST_VALUE,
                     AggregationStep.SINGLE,
                     Collections.singletonList(
-                        new 
TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))));
+                        new 
TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))),
+            null,
+            OrderBy.TIMESTAMP_DESC);
 
     Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
     deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2, 3));
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/AggregationNodeSerdeTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/AggregationNodeSerdeTest.java
index 4a770db76a..7a42e4c85c 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/AggregationNodeSerdeTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/AggregationNodeSerdeTest.java
@@ -75,7 +75,8 @@ public class AggregationNodeSerdeTest {
                     AggregationStep.FINAL,
                     Collections.singletonList(
                         new TimeSeriesOperand(new 
PartialPath("root.sg.d1.s1"))))),
-            groupByTimeParameter);
+            groupByTimeParameter,
+            OrderBy.TIMESTAMP_ASC);
 
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
     aggregationNode.serialize(byteBuffer);
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 6d72f1d7be..85ce2f54b9 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -84,6 +84,7 @@ public enum TSStatusCode {
   WRITE_PROCESS_REJECT(413),
   QUERY_ID_NOT_EXIST(414),
   SNAPSHOT_DIR_NOT_LEGAL(415),
+  SEMANTIC_ERROR(416),
 
   UNSUPPORTED_INDEX_FUNC_ERROR(421),
   UNSUPPORTED_INDEX_TYPE_ERROR(422),

Reply via email to