This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cabb8fff2f [IOTDB-3244] Implememtation of LastOperator (#6001)
cabb8fff2f is described below

commit cabb8fff2f0d7538bbe5d980838078b1baee4343
Author: Jackie Tien <[email protected]>
AuthorDate: Wed May 25 19:39:28 2022 +0800

    [IOTDB-3244] Implememtation of LastOperator (#6001)
---
 .../db/mpp/execution/operator/LastQueryUtil.java   |  79 +++++
 .../operator/process/LastQueryMergeOperator.java   |  82 ++++++
 .../operator/process/UpdateLastCacheOperator.java  | 127 ++++++++
 .../operator/source/LastCacheScanOperator.java     |  64 ++++
 .../db/mpp/plan/planner/LocalExecutionPlanner.java | 235 +++++++++++++++
 .../plan/node/process/LastQueryMergeNode.java      |  32 +-
 .../plan/node/source/AlignedLastQueryScanNode.java |  42 +--
 .../plan/node/source/LastQueryScanNode.java        |  42 +--
 .../iotdb/db/query/executor/LastQueryExecutor.java |   5 +-
 .../operator/LastCacheScanOperatorTest.java        |  93 ++++++
 .../operator/LastQueryMergeOperatorTest.java       | 327 +++++++++++++++++++++
 .../SeriesAggregationScanOperatorTest.java         |   3 +-
 .../operator/UpdateLastCacheOperatorTest.java      | 219 ++++++++++++++
 13 files changed, 1266 insertions(+), 84 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
new file mode 100644
index 0000000000..19e9571171
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.LastValueDescAccumulator;
+import org.apache.iotdb.db.mpp.aggregation.MaxTimeDescAccumulator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.Gt;
+import org.apache.iotdb.tsfile.read.filter.operator.GtEq;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class LastQueryUtil {
+
+  public static TsBlockBuilder createTsBlockBuilder() {
+    return new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, 
TSDataType.TEXT, TSDataType.TEXT));
+  }
+
+  public static TsBlockBuilder createTsBlockBuilder(int 
initialExpectedEntries) {
+    return new TsBlockBuilder(
+        initialExpectedEntries,
+        ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+  }
+
+  public static void appendLastValue(
+      TsBlockBuilder builder, long lastTime, String fullPath, String 
lastValue, String dataType) {
+    // Time
+    builder.getTimeColumnBuilder().writeLong(lastTime);
+    // timeseries
+    builder.getColumnBuilder(0).writeBinary(new Binary(fullPath));
+    // value
+    builder.getColumnBuilder(1).writeBinary(new Binary(lastValue));
+    // dataType
+    builder.getColumnBuilder(2).writeBinary(new Binary(dataType));
+    builder.declarePosition();
+  }
+
+  public static boolean satisfyFilter(Filter filter, TimeValuePair tvPair) {
+    return filter == null || filter.satisfy(tvPair.getTimestamp(), 
tvPair.getValue().getValue());
+  }
+
+  public static List<Aggregator> createAggregators(TSDataType dataType) {
+    // max_time, last_value
+    List<Aggregator> aggregators = new ArrayList<>(2);
+    aggregators.add(new Aggregator(new MaxTimeDescAccumulator(), 
AggregationStep.SINGLE));
+    aggregators.add(new Aggregator(new LastValueDescAccumulator(dataType), 
AggregationStep.SINGLE));
+    return aggregators;
+  }
+
+  public static boolean needUpdateCache(Filter timeFilter) {
+    // Update the cache only when, the filter is gt (greater than) or ge 
(greater than or equal to)
+    return (timeFilter instanceof GtEq) || (timeFilter instanceof Gt);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
new file mode 100644
index 0000000000..261abdeb12
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.List;
+
+public class LastQueryMergeOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+
+  private final List<Operator> children;
+
+  private final int inputOperatorsCount;
+
+  private int currentIndex;
+
+  public LastQueryMergeOperator(OperatorContext operatorContext, 
List<Operator> children) {
+    this.operatorContext = operatorContext;
+    this.children = children;
+    this.inputOperatorsCount = children.size();
+    this.currentIndex = 0;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return children.get(currentIndex).isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    if (children.get(currentIndex).hasNext()) {
+      return children.get(currentIndex).next();
+    } else {
+      currentIndex++;
+      return null;
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    return currentIndex < inputOperatorsCount;
+  }
+
+  @Override
+  public boolean isFinished() {
+    return !hasNext();
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (Operator child : children) {
+      child.close();
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
new file mode 100644
index 0000000000..ee0eb898a8
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class UpdateLastCacheOperator implements ProcessOperator {
+
+  private static final TsBlock LAST_QUERY_EMPTY_TSBLOCK =
+      new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, 
TSDataType.TEXT))
+          .build();
+
+  private final OperatorContext operatorContext;
+
+  private final Operator child;
+
+  // fullPath for queried time series
+  private final PartialPath fullPath;
+
+  // dataType for queried time series;
+  private final String dataType;
+
+  private final DataNodeSchemaCache lastCache;
+
+  private final boolean needUpdateCache;
+
+  private final TsBlockBuilder tsBlockBuilder;
+
+  public UpdateLastCacheOperator(
+      OperatorContext operatorContext,
+      Operator child,
+      PartialPath fullPath,
+      TSDataType dataType,
+      DataNodeSchemaCache dataNodeSchemaCache,
+      boolean needUpdateCache) {
+    this.operatorContext = operatorContext;
+    this.child = child;
+    this.fullPath = fullPath;
+    this.dataType = dataType.name();
+    this.lastCache = dataNodeSchemaCache;
+    this.needUpdateCache = needUpdateCache;
+    this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder(1);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    TsBlock res = child.next();
+    if (res == null) {
+      return null;
+    }
+    if (res.isEmpty()) {
+      return LAST_QUERY_EMPTY_TSBLOCK;
+    }
+
+    checkArgument(res.getPositionCount() == 1, "last query result should only 
have one record");
+
+    long lastTime = res.getColumn(0).getLong(0);
+    TsPrimitiveType lastValue = res.getColumn(1).getTsPrimitiveType(0);
+
+    if (needUpdateCache) {
+      TimeValuePair timeValuePair = new TimeValuePair(lastTime, lastValue);
+      lastCache.updateLastCache(fullPath, timeValuePair, false, 
Long.MIN_VALUE);
+    }
+
+    tsBlockBuilder.reset();
+
+    LastQueryUtil.appendLastValue(
+        tsBlockBuilder, lastTime, fullPath.getFullPath(), 
lastValue.getStringValue(), dataType);
+
+    return tsBlockBuilder.build();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return child.hasNext();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return child.isFinished();
+  }
+
+  @Override
+  public void close() throws Exception {
+    child.close();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
new file mode 100644
index 0000000000..dfb6d82c5c
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.source;
+
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+public class LastCacheScanOperator implements SourceOperator {
+
+  private final OperatorContext operatorContext;
+  private final PlanNodeId sourceId;
+  private TsBlock tsBlock;
+
+  public LastCacheScanOperator(
+      OperatorContext operatorContext, PlanNodeId sourceId, TsBlock tsBlock) {
+    this.operatorContext = operatorContext;
+    this.sourceId = sourceId;
+    this.tsBlock = tsBlock;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public TsBlock next() {
+    TsBlock res = tsBlock;
+    tsBlock = null;
+    return res;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return tsBlock != null && !tsBlock.isEmpty();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return !hasNext();
+  }
+
+  @Override
+  public PlanNodeId getSourceId() {
+    return sourceId;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 736bc5feef..9e7d99cae9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.mpp.plan.planner;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
@@ -35,6 +37,7 @@ import 
org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
@@ -42,6 +45,7 @@ import 
org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FilterOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.LastQueryMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
@@ -49,6 +53,7 @@ import 
org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BooleanConstantFill;
@@ -90,11 +95,13 @@ import 
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregatio
 import 
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildNodesSchemaScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildPathsSchemaScanNode;
@@ -117,14 +124,17 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
@@ -136,7 +146,11 @@ import 
org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
 import org.apache.iotdb.db.mpp.plan.statement.literal.Literal;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.Gt;
+import org.apache.iotdb.tsfile.read.filter.operator.GtEq;
 
 import org.apache.commons.lang3.Validate;
 
@@ -148,11 +162,13 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil.satisfyFilter;
 
 /**
  * Used to plan a fragment instance. Currently, we simply change it from 
PlanNode to executable
@@ -164,6 +180,9 @@ public class LocalExecutionPlanner {
   private static final DataBlockManager DATA_BLOCK_MANAGER =
       DataBlockService.getInstance().getDataBlockManager();
 
+  private static final DataNodeSchemaCache DATA_NODE_SCHEMA_CACHE =
+      DataNodeSchemaCache.getInstance();
+
   private static final TimeComparator ASC_TIME_COMPARATOR = new 
AscTimeComparator();
 
   private static final TimeComparator DESC_TIME_COMPARATOR = new 
DescTimeComparator();
@@ -984,6 +1003,187 @@ public class LocalExecutionPlanner {
           ((SchemaDriverContext) 
(context.instanceContext.getDriverContext())).getSchemaRegion());
     }
 
+    @Override
+    public Operator visitLastQueryScan(LastQueryScanNode node, 
LocalExecutionPlanContext context) {
+      TimeValuePair timeValuePair = 
DATA_NODE_SCHEMA_CACHE.getLastCache(node.getSeriesPath());
+      if (timeValuePair == null) { // last value is not cached
+        return createUpdateLastCacheOperator(node, context);
+      } else if (!satisfyFilter(
+          context.lastQueryTimeFilter, timeValuePair)) { // cached last value 
is not satisfied
+
+        boolean isFilterGtOrGe =
+            (context.lastQueryTimeFilter instanceof Gt
+                || context.lastQueryTimeFilter instanceof GtEq);
+        // time filter is not > or >=, we still need to read from disk
+        if (!isFilterGtOrGe) {
+          return createUpdateLastCacheOperator(node, context);
+        } else { // otherwise, we just ignore it and return null
+          return null;
+        }
+      } else { //  cached last value is satisfied, put it into 
LastCacheScanOperator
+        context.addCachedLastValue(
+            timeValuePair, node.getPlanNodeId(), 
node.getSeriesPath().getFullPath());
+        return null;
+      }
+    }
+
+    private UpdateLastCacheOperator createUpdateLastCacheOperator(
+        LastQueryScanNode node, LocalExecutionPlanContext context) {
+      SeriesAggregationScanOperator lastQueryScan = 
createLastQueryScanOperator(node, context);
+
+      return new UpdateLastCacheOperator(
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              UpdateLastCacheOperator.class.getSimpleName()),
+          lastQueryScan,
+          node.getSeriesPath(),
+          node.getSeriesPath().getSeriesType(),
+          DATA_NODE_SCHEMA_CACHE,
+          context.needUpdateLastCache);
+    }
+
+    private SeriesAggregationScanOperator createLastQueryScanOperator(
+        LastQueryScanNode node, LocalExecutionPlanContext context) {
+      MeasurementPath seriesPath = node.getSeriesPath();
+      OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              SeriesAggregationScanOperator.class.getSimpleName());
+
+      // last_time, last_value
+      List<Aggregator> aggregators = 
LastQueryUtil.createAggregators(seriesPath.getSeriesType());
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator =
+          new SeriesAggregationScanOperator(
+              node.getPlanNodeId(),
+              seriesPath,
+              context.getAllSensors(seriesPath.getDevice(), 
seriesPath.getMeasurement()),
+              operatorContext,
+              aggregators,
+              context.lastQueryTimeFilter,
+              false,
+              null);
+      context.addSourceOperator(seriesAggregationScanOperator);
+      context.addPath(seriesPath);
+      return seriesAggregationScanOperator;
+    }
+
+    @Override
+    public Operator visitAlignedLastQueryScan(
+        AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
+      TimeValuePair timeValuePair = 
DATA_NODE_SCHEMA_CACHE.getLastCache(node.getSeriesPath());
+      if (timeValuePair == null) { // last value is not cached
+        return createUpdateLastCacheOperator(node, context);
+      } else if (!satisfyFilter(
+          context.lastQueryTimeFilter, timeValuePair)) { // cached last value 
is not satisfied
+
+        boolean isFilterGtOrGe =
+            (context.lastQueryTimeFilter instanceof Gt
+                || context.lastQueryTimeFilter instanceof GtEq);
+        // time filter is not > or >=, we still need to read from disk
+        if (!isFilterGtOrGe) {
+          return createUpdateLastCacheOperator(node, context);
+        } else { // otherwise, we just ignore it and return null
+          return null;
+        }
+      } else { //  cached last value is satisfied, put it into 
LastCacheScanOperator
+        context.addCachedLastValue(
+            timeValuePair, node.getPlanNodeId(), 
node.getSeriesPath().getFullPath());
+        return null;
+      }
+    }
+
+    private UpdateLastCacheOperator createUpdateLastCacheOperator(
+        AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
+      AlignedSeriesAggregationScanOperator lastQueryScan =
+          createLastQueryScanOperator(node, context);
+
+      return new UpdateLastCacheOperator(
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              UpdateLastCacheOperator.class.getSimpleName()),
+          lastQueryScan,
+          node.getSeriesPath(),
+          node.getSeriesPath().getSeriesType(),
+          DATA_NODE_SCHEMA_CACHE,
+          context.needUpdateLastCache);
+    }
+
+    private AlignedSeriesAggregationScanOperator createLastQueryScanOperator(
+        AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
+      AlignedPath seriesPath = node.getSeriesPath();
+      OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              AlignedSeriesAggregationScanOperator.class.getSimpleName());
+
+      // last_time, last_value
+      List<Aggregator> aggregators = 
LastQueryUtil.createAggregators(seriesPath.getSeriesType());
+      AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
+          new AlignedSeriesAggregationScanOperator(
+              node.getPlanNodeId(),
+              seriesPath,
+              operatorContext,
+              aggregators,
+              context.lastQueryTimeFilter,
+              false,
+              null);
+      context.addSourceOperator(seriesAggregationScanOperator);
+      context.addPath(seriesPath);
+      return seriesAggregationScanOperator;
+    }
+
+    @Override
+    public Operator visitLastQueryMerge(
+        LastQueryMergeNode node, LocalExecutionPlanContext context) {
+
+      context.setLastQueryTimeFilter(node.getTimeFilter());
+      
context.setNeedUpdateLastCache(LastQueryUtil.needUpdateCache(node.getTimeFilter()));
+
+      List<Operator> operatorList =
+          node.getChildren().stream()
+              .map(child -> child.accept(this, context))
+              .filter(Objects::nonNull)
+              .collect(Collectors.toList());
+
+      List<TimeValuePair> cachedLastValueList = 
context.getCachedLastValueList();
+
+      if (cachedLastValueList != null && !cachedLastValueList.isEmpty()) {
+        TsBlockBuilder builder = 
LastQueryUtil.createTsBlockBuilder(cachedLastValueList.size());
+        for (int i = 0; i < cachedLastValueList.size(); i++) {
+          TimeValuePair timeValuePair = cachedLastValueList.get(i);
+          String fullPath = context.cachedLastValuePathList.get(i);
+          LastQueryUtil.appendLastValue(
+              builder,
+              timeValuePair.getTimestamp(),
+              fullPath,
+              timeValuePair.getValue().getStringValue(),
+              timeValuePair.getValue().getDataType().name());
+        }
+
+        LastCacheScanOperator operator =
+            new LastCacheScanOperator(
+                context.instanceContext.addOperatorContext(
+                    context.getNextOperatorId(),
+                    context.firstCachedPlanNodeId,
+                    LastCacheScanOperator.class.getSimpleName()),
+                context.firstCachedPlanNodeId,
+                builder.build());
+        operatorList.add(operator);
+      }
+
+      return new LastQueryMergeOperator(
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              LastQueryMergeOperator.class.getSimpleName()),
+          operatorList);
+    }
+
     private Map<String, List<InputLocation>> makeLayout(PlanNode node) {
       Map<String, List<InputLocation>> outputMappings = new LinkedHashMap<>();
       int tsBlockIndex = 0;
@@ -1044,6 +1244,18 @@ public class LocalExecutionPlanner {
 
     private TypeProvider typeProvider;
 
+    // cached last value in last query
+    private List<TimeValuePair> cachedLastValueList;
+    // full path for each cached last value, this size should be equal to 
cachedLastValueList
+    private List<String> cachedLastValuePathList;
+    // PlanNodeId of first LastQueryScanNode/AlignedLastQueryScanNode, it's 
used for sourceId of
+    // LastCachedScanOperator
+    private PlanNodeId firstCachedPlanNodeId;
+    // timeFilter for last query
+    private Filter lastQueryTimeFilter;
+    // whether we need to update last cache
+    private boolean needUpdateLastCache;
+
     public LocalExecutionPlanContext(
         TypeProvider typeProvider, FragmentInstanceContext instanceContext) {
       this.typeProvider = typeProvider;
@@ -1086,6 +1298,29 @@ public class LocalExecutionPlanner {
       sourceOperators.add(sourceOperator);
     }
 
+    public void setLastQueryTimeFilter(Filter lastQueryTimeFilter) {
+      this.lastQueryTimeFilter = lastQueryTimeFilter;
+    }
+
+    public void setNeedUpdateLastCache(boolean needUpdateLastCache) {
+      this.needUpdateLastCache = needUpdateLastCache;
+    }
+
+    public void addCachedLastValue(
+        TimeValuePair timeValuePair, PlanNodeId planNodeId, String fullPath) {
+      if (cachedLastValueList == null) {
+        cachedLastValueList = new ArrayList<>();
+        cachedLastValuePathList = new ArrayList<>();
+        firstCachedPlanNodeId = planNodeId;
+      }
+      cachedLastValueList.add(timeValuePair);
+      cachedLastValuePathList.add(fullPath);
+    }
+
+    public List<TimeValuePair> getCachedLastValueList() {
+      return cachedLastValueList;
+    }
+
     public ISinkHandle getSinkHandle() {
       return sinkHandle;
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
index e8223a42d0..5923bc8942 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
@@ -22,6 +22,11 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.Nullable;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -35,14 +40,18 @@ public class LastQueryMergeNode extends ProcessNode {
   // make sure child in list has been ordered by their sensor name
   private List<PlanNode> children;
 
-  public LastQueryMergeNode(PlanNodeId id) {
+  private final Filter timeFilter;
+
+  public LastQueryMergeNode(PlanNodeId id, Filter timeFilter) {
     super(id);
     this.children = new ArrayList<>();
+    this.timeFilter = timeFilter;
   }
 
-  public LastQueryMergeNode(PlanNodeId id, List<PlanNode> children) {
+  public LastQueryMergeNode(PlanNodeId id, List<PlanNode> children, Filter 
timeFilter) {
     super(id);
     this.children = children;
+    this.timeFilter = timeFilter;
   }
 
   @Override
@@ -57,7 +66,7 @@ public class LastQueryMergeNode extends ProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new LastQueryMergeNode(getPlanNodeId());
+    return new LastQueryMergeNode(getPlanNodeId(), timeFilter);
   }
 
   @Override
@@ -97,14 +106,29 @@ public class LastQueryMergeNode extends ProcessNode {
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.LAST_QUERY_MERGE.serialize(byteBuffer);
+    if (timeFilter == null) {
+      ReadWriteIOUtils.write((byte) 0, byteBuffer);
+    } else {
+      ReadWriteIOUtils.write((byte) 1, byteBuffer);
+      timeFilter.serialize(byteBuffer);
+    }
   }
 
   public static LastQueryMergeNode deserialize(ByteBuffer byteBuffer) {
+    Filter timeFilter = null;
+    if (!ReadWriteIOUtils.readIsNull(byteBuffer)) {
+      timeFilter = FilterFactory.deserialize(byteBuffer);
+    }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new LastQueryMergeNode(planNodeId);
+    return new LastQueryMergeNode(planNodeId, timeFilter);
   }
 
   public void setChildren(List<PlanNode> children) {
     this.children = children;
   }
+
+  @Nullable
+  public Filter getTimeFilter() {
+    return timeFilter;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
index 837bdbace4..dced79aa0a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
@@ -25,14 +25,9 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import com.google.common.collect.ImmutableList;
 
-import javax.annotation.Nullable;
-
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
@@ -43,25 +38,18 @@ public class AlignedLastQueryScanNode extends SourceNode {
   // The path of the target series which will be scanned.
   private final AlignedPath seriesPath;
 
-  private Filter timeFilter;
-
   // The id of DataRegion where the node will run
   private TRegionReplicaSet regionReplicaSet;
 
-  public AlignedLastQueryScanNode(PlanNodeId id, AlignedPath seriesPath, 
Filter timeFilter) {
+  public AlignedLastQueryScanNode(PlanNodeId id, AlignedPath seriesPath) {
     super(id);
     this.seriesPath = seriesPath;
-    this.timeFilter = timeFilter;
   }
 
   public AlignedLastQueryScanNode(
-      PlanNodeId id,
-      AlignedPath seriesPath,
-      Filter timeFilter,
-      TRegionReplicaSet regionReplicaSet) {
+      PlanNodeId id, AlignedPath seriesPath, TRegionReplicaSet 
regionReplicaSet) {
     super(id);
     this.seriesPath = seriesPath;
-    this.timeFilter = timeFilter;
     this.regionReplicaSet = regionReplicaSet;
   }
 
@@ -93,7 +81,7 @@ public class AlignedLastQueryScanNode extends SourceNode {
 
   @Override
   public PlanNode clone() {
-    return new AlignedLastQueryScanNode(getPlanNodeId(), seriesPath, 
timeFilter, regionReplicaSet);
+    return new AlignedLastQueryScanNode(getPlanNodeId(), seriesPath, 
regionReplicaSet);
   }
 
   @Override
@@ -118,13 +106,12 @@ public class AlignedLastQueryScanNode extends SourceNode {
     if (!super.equals(o)) return false;
     AlignedLastQueryScanNode that = (AlignedLastQueryScanNode) o;
     return Objects.equals(seriesPath, that.seriesPath)
-        && Objects.equals(timeFilter, that.timeFilter)
         && Objects.equals(regionReplicaSet, that.regionReplicaSet);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), seriesPath, timeFilter, 
regionReplicaSet);
+    return Objects.hash(super.hashCode(), seriesPath, regionReplicaSet);
   }
 
   @Override
@@ -138,34 +125,15 @@ public class AlignedLastQueryScanNode extends SourceNode {
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.ALIGNED_LAST_QUERY_SCAN.serialize(byteBuffer);
     seriesPath.serialize(byteBuffer);
-    if (timeFilter == null) {
-      ReadWriteIOUtils.write((byte) 0, byteBuffer);
-    } else {
-      ReadWriteIOUtils.write((byte) 1, byteBuffer);
-      timeFilter.serialize(byteBuffer);
-    }
   }
 
   public static AlignedLastQueryScanNode deserialize(ByteBuffer byteBuffer) {
     AlignedPath partialPath = (AlignedPath) 
PathDeserializeUtil.deserialize(byteBuffer);
-    Filter timeFilter = null;
-    if (!ReadWriteIOUtils.readIsNull(byteBuffer)) {
-      timeFilter = FilterFactory.deserialize(byteBuffer);
-    }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new AlignedLastQueryScanNode(planNodeId, partialPath, timeFilter);
+    return new AlignedLastQueryScanNode(planNodeId, partialPath);
   }
 
   public AlignedPath getSeriesPath() {
     return seriesPath;
   }
-
-  @Nullable
-  public Filter getTimeFilter() {
-    return timeFilter;
-  }
-
-  public void setTimeFilter(@Nullable Filter timeFilter) {
-    this.timeFilter = timeFilter;
-  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
index 546c357e82..ab20f9548c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
@@ -25,14 +25,9 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import com.google.common.collect.ImmutableList;
 
-import javax.annotation.Nullable;
-
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
@@ -45,25 +40,18 @@ public class LastQueryScanNode extends SourceNode {
   // The path of the target series which will be scanned.
   private final MeasurementPath seriesPath;
 
-  private Filter timeFilter;
-
   // The id of DataRegion where the node will run
   private TRegionReplicaSet regionReplicaSet;
 
-  public LastQueryScanNode(PlanNodeId id, MeasurementPath seriesPath, Filter 
timeFilter) {
+  public LastQueryScanNode(PlanNodeId id, MeasurementPath seriesPath) {
     super(id);
     this.seriesPath = seriesPath;
-    this.timeFilter = timeFilter;
   }
 
   public LastQueryScanNode(
-      PlanNodeId id,
-      MeasurementPath seriesPath,
-      Filter timeFilter,
-      TRegionReplicaSet regionReplicaSet) {
+      PlanNodeId id, MeasurementPath seriesPath, TRegionReplicaSet 
regionReplicaSet) {
     super(id);
     this.seriesPath = seriesPath;
-    this.timeFilter = timeFilter;
     this.regionReplicaSet = regionReplicaSet;
   }
 
@@ -84,15 +72,6 @@ public class LastQueryScanNode extends SourceNode {
     return seriesPath;
   }
 
-  @Nullable
-  public Filter getTimeFilter() {
-    return timeFilter;
-  }
-
-  public void setTimeFilter(@Nullable Filter timeFilter) {
-    this.timeFilter = timeFilter;
-  }
-
   @Override
   public void close() throws Exception {}
 
@@ -108,7 +87,7 @@ public class LastQueryScanNode extends SourceNode {
 
   @Override
   public PlanNode clone() {
-    return new LastQueryScanNode(getPlanNodeId(), seriesPath, timeFilter, 
regionReplicaSet);
+    return new LastQueryScanNode(getPlanNodeId(), seriesPath, 
regionReplicaSet);
   }
 
   @Override
@@ -133,13 +112,12 @@ public class LastQueryScanNode extends SourceNode {
     if (!super.equals(o)) return false;
     LastQueryScanNode that = (LastQueryScanNode) o;
     return Objects.equals(seriesPath, that.seriesPath)
-        && Objects.equals(timeFilter, that.timeFilter)
         && Objects.equals(regionReplicaSet, that.regionReplicaSet);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), seriesPath, timeFilter, 
regionReplicaSet);
+    return Objects.hash(super.hashCode(), seriesPath, regionReplicaSet);
   }
 
   @Override
@@ -153,21 +131,11 @@ public class LastQueryScanNode extends SourceNode {
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.LAST_QUERY_SCAN.serialize(byteBuffer);
     seriesPath.serialize(byteBuffer);
-    if (timeFilter == null) {
-      ReadWriteIOUtils.write((byte) 0, byteBuffer);
-    } else {
-      ReadWriteIOUtils.write((byte) 1, byteBuffer);
-      timeFilter.serialize(byteBuffer);
-    }
   }
 
   public static LastQueryScanNode deserialize(ByteBuffer byteBuffer) {
     MeasurementPath partialPath = (MeasurementPath) 
PathDeserializeUtil.deserialize(byteBuffer);
-    Filter timeFilter = null;
-    if (!ReadWriteIOUtils.readIsNull(byteBuffer)) {
-      timeFilter = FilterFactory.deserialize(byteBuffer);
-    }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new LastQueryScanNode(planNodeId, partialPath, timeFilter);
+    return new LastQueryScanNode(planNodeId, partialPath);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index 62773b5ed9..af9526b142 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -67,6 +67,7 @@ import java.util.stream.Collectors;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES;
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_VALUE;
+import static 
org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil.satisfyFilter;
 
 public class LastQueryExecutor {
 
@@ -379,10 +380,6 @@ public class LastQueryExecutor {
     }
   }
 
-  private static boolean satisfyFilter(Filter filter, TimeValuePair tvPair) {
-    return filter == null || filter.satisfy(tvPair.getTimestamp(), 
tvPair.getValue().getValue());
-  }
-
   public static void clear() {
     ID_TABLE_ENABLED = 
IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
   }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java
new file mode 100644
index 0000000000..72ee86fb65
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+
+import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class LastCacheScanOperatorTest {
+
+  @Test
+  public void batchTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+
+      TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(6);
+
+      LastQueryUtil.appendLastValue(builder, 1, "root.sg.d.s1", "true", 
"BOOLEAN");
+      LastQueryUtil.appendLastValue(builder, 2, "root.sg.d.s2", "2", "INT32");
+      LastQueryUtil.appendLastValue(builder, 3, "root.sg.d.s3", "3", "INT64");
+      LastQueryUtil.appendLastValue(builder, 4, "root.sg.d.s4", "4.4", 
"FLOAT");
+      LastQueryUtil.appendLastValue(builder, 3, "root.sg.d.s5", "3.3", 
"DOUBLE");
+      LastQueryUtil.appendLastValue(builder, 1, "root.sg.d.s6", "peace", 
"TEXT");
+
+      TsBlock tsBlock = builder.build();
+
+      LastCacheScanOperator lastCacheScanOperator =
+          new LastCacheScanOperator(
+              fragmentInstanceContext.getOperatorContexts().get(0), 
planNodeId1, tsBlock);
+
+      assertTrue(lastCacheScanOperator.isBlocked().isDone());
+      assertTrue(lastCacheScanOperator.hasNext());
+      TsBlock result = lastCacheScanOperator.next();
+      assertEquals(tsBlock.getPositionCount(), result.getPositionCount());
+      assertEquals(tsBlock.getValueColumnCount(), 
result.getValueColumnCount());
+      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+        assertEquals(tsBlock.getTimeByIndex(i), result.getTimeByIndex(i));
+        for (int j = 0; j < tsBlock.getValueColumnCount(); j++) {
+          assertEquals(tsBlock.getColumn(j).getBinary(i), 
result.getColumn(j).getBinary(i));
+        }
+      }
+      assertFalse(lastCacheScanOperator.hasNext());
+      assertTrue(lastCacheScanOperator.isFinished());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
new file mode 100644
index 0000000000..03fa1a1584
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.LastQueryMergeOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class LastQueryMergeOperatorTest {
+
+  private static final String SERIES_SCAN_OPERATOR_TEST_SG = 
"root.LastQueryMergeOperatorTest";
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+  private ExecutorService instanceNotificationExecutor;
+
+  @Before
+  public void setUp() throws MetadataException, IOException, 
WriteProcessException {
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unSeqResources, 
SERIES_SCAN_OPERATOR_TEST_SG);
+    this.instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+    instanceNotificationExecutor.shutdown();
+  }
+
+  @Test
+  public void testLastQueryMergeOperatorTestWithoutCachedValue() {
+    try {
+      List<Aggregator> aggregators1 = 
LastQueryUtil.createAggregators(TSDataType.INT32);
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + 
".device0.sensor0", TSDataType.INT32);
+      List<Aggregator> aggregators2 = 
LastQueryUtil.createAggregators(TSDataType.INT32);
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + 
".device0.sensor1", TSDataType.INT32);
+      Set<String> allSensors = Sets.newHashSet("sensor0", "sensor1");
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, SeriesAggregationScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, UpdateLastCacheOperator.class.getSimpleName());
+
+      PlanNodeId planNodeId3 = new PlanNodeId("3");
+      fragmentInstanceContext.addOperatorContext(
+          3, planNodeId3, SeriesAggregationScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId4 = new PlanNodeId("4");
+      fragmentInstanceContext.addOperatorContext(
+          4, planNodeId4, UpdateLastCacheOperator.class.getSimpleName());
+
+      PlanNodeId planNodeId5 = new PlanNodeId("5");
+      fragmentInstanceContext.addOperatorContext(
+          5, planNodeId5, LastQueryMergeOperator.class.getSimpleName());
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator1 =
+          new SeriesAggregationScanOperator(
+              planNodeId1,
+              measurementPath1,
+              allSensors,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              aggregators1,
+              null,
+              false,
+              null);
+      seriesAggregationScanOperator1.initQueryDataSource(
+          new QueryDataSource(seqResources, unSeqResources));
+
+      UpdateLastCacheOperator updateLastCacheOperator1 =
+          new UpdateLastCacheOperator(
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              seriesAggregationScanOperator1,
+              measurementPath1,
+              measurementPath1.getSeriesType(),
+              null,
+              false);
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator2 =
+          new SeriesAggregationScanOperator(
+              planNodeId3,
+              measurementPath2,
+              allSensors,
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              aggregators2,
+              null,
+              false,
+              null);
+      seriesAggregationScanOperator2.initQueryDataSource(
+          new QueryDataSource(seqResources, unSeqResources));
+
+      UpdateLastCacheOperator updateLastCacheOperator2 =
+          new UpdateLastCacheOperator(
+              fragmentInstanceContext.getOperatorContexts().get(3),
+              seriesAggregationScanOperator2,
+              measurementPath2,
+              measurementPath2.getSeriesType(),
+              null,
+              false);
+
+      LastQueryMergeOperator lastQueryMergeOperator =
+          new LastQueryMergeOperator(
+              fragmentInstanceContext.getOperatorContexts().get(4),
+              ImmutableList.of(updateLastCacheOperator1, 
updateLastCacheOperator2));
+
+      int count = 0;
+      while (!lastQueryMergeOperator.isFinished()) {
+        assertTrue(lastQueryMergeOperator.isBlocked().isDone());
+        assertTrue(lastQueryMergeOperator.hasNext());
+        TsBlock result = lastQueryMergeOperator.next();
+        if (result == null) {
+          continue;
+        }
+        assertEquals(3, result.getValueColumnCount());
+
+        for (int i = 0; i < result.getPositionCount(); i++) {
+          assertEquals(499, result.getTimeByIndex(i));
+          assertEquals(
+              SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor" + count,
+              result.getColumn(0).getBinary(i).toString());
+          assertEquals("10499", result.getColumn(1).getBinary(i).toString());
+          assertEquals(TSDataType.INT32.name(), 
result.getColumn(2).getBinary(i).toString());
+          count++;
+        }
+      }
+
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void testUpdateLastCacheOperatorTestWithCachedValue() {
+    try {
+      List<Aggregator> aggregators1 = 
LastQueryUtil.createAggregators(TSDataType.INT32);
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + 
".device0.sensor0", TSDataType.INT32);
+      List<Aggregator> aggregators2 = 
LastQueryUtil.createAggregators(TSDataType.INT32);
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + 
".device0.sensor1", TSDataType.INT32);
+      Set<String> allSensors = Sets.newHashSet("sensor0", "sensor1");
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, SeriesAggregationScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, UpdateLastCacheOperator.class.getSimpleName());
+
+      PlanNodeId planNodeId3 = new PlanNodeId("3");
+      fragmentInstanceContext.addOperatorContext(
+          3, planNodeId3, SeriesAggregationScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId4 = new PlanNodeId("4");
+      fragmentInstanceContext.addOperatorContext(
+          4, planNodeId4, UpdateLastCacheOperator.class.getSimpleName());
+
+      PlanNodeId planNodeId5 = new PlanNodeId("5");
+      fragmentInstanceContext.addOperatorContext(
+          5, planNodeId4, LastCacheScanOperator.class.getSimpleName());
+
+      PlanNodeId planNodeId6 = new PlanNodeId("6");
+      fragmentInstanceContext.addOperatorContext(
+          6, planNodeId6, LastQueryMergeOperator.class.getSimpleName());
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator1 =
+          new SeriesAggregationScanOperator(
+              planNodeId1,
+              measurementPath1,
+              allSensors,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              aggregators1,
+              null,
+              false,
+              null);
+      seriesAggregationScanOperator1.initQueryDataSource(
+          new QueryDataSource(seqResources, unSeqResources));
+
+      UpdateLastCacheOperator updateLastCacheOperator1 =
+          new UpdateLastCacheOperator(
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              seriesAggregationScanOperator1,
+              measurementPath1,
+              measurementPath1.getSeriesType(),
+              null,
+              false);
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator2 =
+          new SeriesAggregationScanOperator(
+              planNodeId3,
+              measurementPath2,
+              allSensors,
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              aggregators2,
+              null,
+              false,
+              null);
+      seriesAggregationScanOperator2.initQueryDataSource(
+          new QueryDataSource(seqResources, unSeqResources));
+
+      UpdateLastCacheOperator updateLastCacheOperator2 =
+          new UpdateLastCacheOperator(
+              fragmentInstanceContext.getOperatorContexts().get(3),
+              seriesAggregationScanOperator2,
+              measurementPath2,
+              measurementPath2.getSeriesType(),
+              null,
+              false);
+
+      TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(6);
+
+      LastQueryUtil.appendLastValue(
+          builder, 499, SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor2", 
"10499", "INT32");
+      LastQueryUtil.appendLastValue(
+          builder, 499, SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor3", 
"10499", "INT32");
+      LastQueryUtil.appendLastValue(
+          builder, 499, SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor4", 
"10499", "INT32");
+
+      TsBlock tsBlock = builder.build();
+
+      LastCacheScanOperator lastCacheScanOperator =
+          new LastCacheScanOperator(
+              fragmentInstanceContext.getOperatorContexts().get(4), 
planNodeId5, tsBlock);
+
+      LastQueryMergeOperator lastQueryMergeOperator =
+          new LastQueryMergeOperator(
+              fragmentInstanceContext.getOperatorContexts().get(5),
+              ImmutableList.of(
+                  updateLastCacheOperator1, updateLastCacheOperator2, 
lastCacheScanOperator));
+
+      int count = 0;
+      while (!lastQueryMergeOperator.isFinished()) {
+        assertTrue(lastQueryMergeOperator.isBlocked().isDone());
+        assertTrue(lastQueryMergeOperator.hasNext());
+        TsBlock result = lastQueryMergeOperator.next();
+        if (result == null) {
+          continue;
+        }
+        assertEquals(3, result.getValueColumnCount());
+
+        for (int i = 0; i < result.getPositionCount(); i++) {
+          assertEquals(499, result.getTimeByIndex(i));
+          assertEquals(
+              SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor" + count,
+              result.getColumn(0).getBinary(i).toString());
+          assertEquals("10499", result.getColumn(1).getBinary(i).toString());
+          assertEquals(TSDataType.INT32.name(), 
result.getColumn(2).getBinary(i).toString());
+          count++;
+        }
+      }
+
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
index 0c62a46328..7e95c4ac2c 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import 
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
-import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -479,7 +478,7 @@ public class SeriesAggregationScanOperatorTest {
         createFragmentInstanceContext(instanceId, stateMachine);
     PlanNodeId planNodeId = new PlanNodeId("1");
     fragmentInstanceContext.addOperatorContext(
-        1, planNodeId, SeriesScanOperator.class.getSimpleName());
+        1, planNodeId, SeriesAggregationScanOperator.class.getSimpleName());
 
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         new SeriesAggregationScanOperator(
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
new file mode 100644
index 0000000000..b78cb7b1ac
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class UpdateLastCacheOperatorTest {
+
+  private static final String SERIES_SCAN_OPERATOR_TEST_SG = 
"root.UpdateLastCacheOperator";
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+  private ExecutorService instanceNotificationExecutor;
+
+  @Before
+  public void setUp() throws MetadataException, IOException, 
WriteProcessException {
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unSeqResources, 
SERIES_SCAN_OPERATOR_TEST_SG);
+    this.instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+    instanceNotificationExecutor.shutdown();
+  }
+
+  @Test
+  public void testUpdateLastCacheOperatorTestWithoutTimeFilter() {
+    try {
+      List<Aggregator> aggregators = 
LastQueryUtil.createAggregators(TSDataType.INT32);
+      UpdateLastCacheOperator updateLastCacheOperator =
+          initUpdateLastCacheOperator(aggregators, null, false, null);
+
+      assertTrue(updateLastCacheOperator.isBlocked().isDone());
+      assertTrue(updateLastCacheOperator.hasNext());
+      TsBlock result = updateLastCacheOperator.next();
+      assertEquals(1, result.getPositionCount());
+      assertEquals(3, result.getValueColumnCount());
+
+      assertEquals(499, result.getTimeByIndex(0));
+      assertEquals(
+          SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0",
+          result.getColumn(0).getBinary(0).toString());
+      assertEquals("10499", result.getColumn(1).getBinary(0).toString());
+      assertEquals(TSDataType.INT32.name(), 
result.getColumn(2).getBinary(0).toString());
+
+      assertFalse(updateLastCacheOperator.hasNext());
+      assertTrue(updateLastCacheOperator.isFinished());
+
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void testUpdateLastCacheOperatorTestWithTimeFilter1() {
+    try {
+      List<Aggregator> aggregators = 
LastQueryUtil.createAggregators(TSDataType.INT32);
+      Filter timeFilter = TimeFilter.gtEq(200);
+      UpdateLastCacheOperator updateLastCacheOperator =
+          initUpdateLastCacheOperator(aggregators, timeFilter, false, null);
+
+      assertTrue(updateLastCacheOperator.isBlocked().isDone());
+      assertTrue(updateLastCacheOperator.hasNext());
+      TsBlock result = updateLastCacheOperator.next();
+      assertEquals(1, result.getPositionCount());
+      assertEquals(3, result.getValueColumnCount());
+
+      assertEquals(499, result.getTimeByIndex(0));
+      assertEquals(
+          SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0",
+          result.getColumn(0).getBinary(0).toString());
+      assertEquals("10499", result.getColumn(1).getBinary(0).toString());
+      assertEquals(TSDataType.INT32.name(), 
result.getColumn(2).getBinary(0).toString());
+
+      assertFalse(updateLastCacheOperator.hasNext());
+      assertTrue(updateLastCacheOperator.isFinished());
+
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void testUpdateLastCacheOperatorTestWithTimeFilter2() {
+    try {
+      List<Aggregator> aggregators = 
LastQueryUtil.createAggregators(TSDataType.INT32);
+      Filter timeFilter = TimeFilter.ltEq(120);
+      UpdateLastCacheOperator updateLastCacheOperator =
+          initUpdateLastCacheOperator(aggregators, timeFilter, false, null);
+
+      assertTrue(updateLastCacheOperator.isBlocked().isDone());
+      assertTrue(updateLastCacheOperator.hasNext());
+      TsBlock result = updateLastCacheOperator.next();
+      assertEquals(1, result.getPositionCount());
+      assertEquals(3, result.getValueColumnCount());
+
+      assertEquals(120, result.getTimeByIndex(0));
+      assertEquals(
+          SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0",
+          result.getColumn(0).getBinary(0).toString());
+      assertEquals("20120", result.getColumn(1).getBinary(0).toString());
+      assertEquals(TSDataType.INT32.name(), 
result.getColumn(2).getBinary(0).toString());
+
+      assertFalse(updateLastCacheOperator.hasNext());
+      assertTrue(updateLastCacheOperator.isFinished());
+
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  public UpdateLastCacheOperator initUpdateLastCacheOperator(
+      List<Aggregator> aggregators,
+      Filter timeFilter,
+      boolean ascending,
+      GroupByTimeParameter groupByTimeParameter)
+      throws IllegalPathException {
+    MeasurementPath measurementPath =
+        new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", 
TSDataType.INT32);
+    Set<String> allSensors = Sets.newHashSet("sensor0");
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    PlanNodeId planNodeId1 = new PlanNodeId("1");
+    fragmentInstanceContext.addOperatorContext(
+        1, planNodeId1, SeriesAggregationScanOperator.class.getSimpleName());
+    PlanNodeId planNodeId2 = new PlanNodeId("2");
+    fragmentInstanceContext.addOperatorContext(
+        2, planNodeId2, UpdateLastCacheOperator.class.getSimpleName());
+
+    SeriesAggregationScanOperator seriesAggregationScanOperator =
+        new SeriesAggregationScanOperator(
+            planNodeId1,
+            measurementPath,
+            allSensors,
+            fragmentInstanceContext.getOperatorContexts().get(0),
+            aggregators,
+            timeFilter,
+            ascending,
+            groupByTimeParameter);
+    seriesAggregationScanOperator.initQueryDataSource(
+        new QueryDataSource(seqResources, unSeqResources));
+
+    return new UpdateLastCacheOperator(
+        fragmentInstanceContext.getOperatorContexts().get(1),
+        seriesAggregationScanOperator,
+        measurementPath,
+        measurementPath.getSeriesType(),
+        null,
+        false);
+  }
+}

Reply via email to