This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new f0ce39615e3 [To rel/1.2] Cherry-pick last query feature for non-base 
view
f0ce39615e3 is described below

commit f0ce39615e336537c197beabc2dabdb6c47e528f
Author: Beyyes <[email protected]>
AuthorDate: Thu Sep 28 10:01:20 2023 +0800

    [To rel/1.2] Cherry-pick last query feature for non-base view
---
 .../operator/process/last/LastQueryOperator.java   |   6 +-
 .../process/last/LastQuerySortOperator.java        |   4 +-
 .../process/last/LastQueryTransformOperator.java   | 120 ++++++++++++++++++
 .../db/queryengine/plan/analyze/Analysis.java      |  24 +++-
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  29 +++--
 .../schema/lastcache/ILastCacheContainer.java      |   2 +-
 .../plan/planner/LogicalPlanBuilder.java           | 138 ++++++++++++++++-----
 .../plan/planner/LogicalPlanVisitor.java           |  10 +-
 .../plan/planner/OperatorTreeGenerator.java        |  26 +++-
 .../plan/planner/SubPlanTypeExtractor.java         |  15 +++
 .../distribution/DistributionPlanContext.java      |   5 +-
 .../planner/distribution/DistributionPlanner.java  |  10 +-
 .../planner/distribution/ExchangeNodeAdder.java    |  82 ++++++------
 .../planner/distribution/NodeGroupContext.java     |  18 ++-
 .../SimpleFragmentParallelPlanner.java             |   5 +-
 .../plan/planner/distribution/SourceRewriter.java  |  91 +++++++-------
 .../db/queryengine/plan/planner/plan/SubPlan.java  |   5 +-
 .../plan/planner/plan/node/PlanGraphPrinter.java   |   9 ++
 .../plan/planner/plan/node/PlanNodeType.java       |   6 +-
 .../plan/planner/plan/node/PlanVisitor.java        |   5 +
 .../node/process/last/LastQueryCollectNode.java    |  17 ++-
 .../plan/node/process/last/LastQueryMergeNode.java |  24 ++--
 .../plan/node/process/last/LastQueryNode.java      |  27 +++-
 ...ollectNode.java => LastQueryTransformNode.java} |  92 ++++++++------
 .../planner/plan/node/sink/IdentitySinkNode.java   |   5 +
 .../plan/node/source/AlignedSeriesScanNode.java    |   4 +-
 .../execution/operator/OperatorMemoryTest.java     |   5 +-
 .../plan/plan/QueryLogicalPlanUtil.java            |   2 +-
 .../plan/plan/distribution/LastQueryTest.java      |   2 +-
 29 files changed, 569 insertions(+), 219 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java
index 1408207c828..b1150507b55 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java
@@ -44,7 +44,7 @@ public class LastQueryOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
 
-  private final List<AbstractUpdateLastCacheOperator> children;
+  private final List<Operator> children;
 
   private final int inputOperatorsCount;
 
@@ -53,9 +53,7 @@ public class LastQueryOperator implements ProcessOperator {
   private TsBlockBuilder tsBlockBuilder;
 
   public LastQueryOperator(
-      OperatorContext operatorContext,
-      List<AbstractUpdateLastCacheOperator> children,
-      TsBlockBuilder builder) {
+      OperatorContext operatorContext, List<Operator> children, TsBlockBuilder 
builder) {
     this.operatorContext = operatorContext;
     this.children = children;
     this.inputOperatorsCount = children.size();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java
index f08aa156b21..aafdc71df9d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java
@@ -55,7 +55,7 @@ public class LastQuerySortOperator implements ProcessOperator 
{
   private int cachedTsBlockRowIndex;
 
   // we must make sure that Operator in children has already been sorted
-  private final List<AbstractUpdateLastCacheOperator> children;
+  private final List<Operator> children;
 
   private final OperatorContext operatorContext;
 
@@ -75,7 +75,7 @@ public class LastQuerySortOperator implements ProcessOperator 
{
   public LastQuerySortOperator(
       OperatorContext operatorContext,
       TsBlock cachedTsBlock,
-      List<AbstractUpdateLastCacheOperator> children,
+      List<Operator> children,
       Comparator<Binary> timeSeriesComparator) {
     this.cachedTsBlock = cachedTsBlock;
     this.cachedTsBlockSize = cachedTsBlock.getPositionCount();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryTransformOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryTransformOperator.java
new file mode 100644
index 00000000000..0b87821bb1d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryTransformOperator.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.last;
+
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class LastQueryTransformOperator implements ProcessOperator {
+
+  private String viewPath;
+
+  private String dataType;
+
+  private final OperatorContext operatorContext;
+
+  // the child of LastQueryTransformOperator will always be AggOperator
+  private final Operator child;
+
+  private TsBlockBuilder tsBlockBuilder;
+
+  public LastQueryTransformOperator(
+      String viewPath, String dataType, OperatorContext operatorContext, 
Operator child) {
+    this.viewPath = viewPath;
+    this.dataType = dataType;
+    this.operatorContext = operatorContext;
+    this.child = child;
+    this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder(1);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return this.operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (!tsBlockBuilder.isFull()) {
+      TsBlock tsBlock = child.nextWithTimer();
+      if (tsBlock == null) {
+        return null;
+      } else if (!tsBlock.isEmpty()) {
+        if (tsBlock.getColumn(1).isNull(0)) {
+          return null;
+        }
+        LastQueryUtil.appendLastValue(
+            tsBlockBuilder,
+            tsBlock.getColumn(0).getLong(0),
+            viewPath,
+            tsBlock.getColumn(1).getTsPrimitiveType(0).getStringValue(),
+            dataType);
+      }
+    } else {
+      child.close();
+    }
+
+    TsBlock res = tsBlockBuilder.build();
+    tsBlockBuilder.reset();
+    return res;
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    return child.hasNext();
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    return !hasNextWithTimer();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return Math.max(child.calculateMaxPeekMemory(), 
child.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return child.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (child != null) {
+      child.close();
+    }
+    tsBlockBuilder = null;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index caeb12e3295..86528182392 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -209,6 +209,11 @@ public class Analysis {
   // timeseries, otherwise it will be null
   private Ordering timeseriesOrderingForLastQuery = null;
 
+  // Key: non-writable view expression, Value: corresponding source expressions
+  private Map<Expression, List<Expression>> 
lastQueryNonWritableViewSourceExpressionMap;
+
+  private Set<Expression> lastQueryBaseExpressions;
+
   // header of result dataset
   private DatasetHeader respDatasetHeader;
 
@@ -344,7 +349,7 @@ public class Analysis {
       return null;
     }
     TSDataType type = expressionTypes.get(NodeRef.of(expression));
-    checkArgument(type != null, "Expression not analyzed: %s", expression);
+    checkArgument(type != null, "Expression is not analyzed: %s", expression);
     return type;
   }
 
@@ -726,6 +731,23 @@ public class Analysis {
     this.timeseriesOrderingForLastQuery = timeseriesOrderingForLastQuery;
   }
 
+  public Set<Expression> getLastQueryBaseExpressions() {
+    return this.lastQueryBaseExpressions;
+  }
+
+  public void setLastQueryBaseExpressions(Set<Expression> 
lastQueryBaseExpressions) {
+    this.lastQueryBaseExpressions = lastQueryBaseExpressions;
+  }
+
+  public Map<Expression, List<Expression>> 
getLastQueryNonWritableViewSourceExpressionMap() {
+    return this.lastQueryNonWritableViewSourceExpressionMap;
+  }
+
+  public void setLastQueryNonWritableViewSourceExpressionMap(
+      Map<Expression, List<Expression>> 
lastQueryNonWritableViewSourceExpressionMap) {
+    this.lastQueryNonWritableViewSourceExpressionMap = 
lastQueryNonWritableViewSourceExpressionMap;
+  }
+
   public Map<String, List<String>> getOutputDeviceToQueriedDevicesMap() {
     return outputDeviceToQueriedDevicesMap;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 051b55db58f..87e9a6b30f0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -435,20 +435,33 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
 
   private void analyzeLastSource(
       Analysis analysis, List<Expression> selectExpressions, ISchemaTree 
schemaTree) {
-    Set<Expression> sourceExpressions;
-
-    sourceExpressions = new LinkedHashSet<>();
+    Set<Expression> sourceExpressions = new LinkedHashSet<>();
+    Set<Expression> lastQueryBaseExpressions = new LinkedHashSet<>();
+    Map<Expression, List<Expression>> 
lastQueryNonWritableViewSourceExpressionMap = null;
 
     for (Expression selectExpression : selectExpressions) {
-      for (Expression sourceExpression : 
bindSchemaForExpression(selectExpression, schemaTree)) {
-        if (!(sourceExpression instanceof TimeSeriesOperand)) {
-          throw new SemanticException(
-              "Views with functions and expressions cannot be used in LAST 
query");
+      for (Expression lastQuerySourceExpression :
+          bindSchemaForExpression(selectExpression, schemaTree)) {
+        if (lastQuerySourceExpression instanceof TimeSeriesOperand) {
+          lastQueryBaseExpressions.add(lastQuerySourceExpression);
+          sourceExpressions.add(lastQuerySourceExpression);
+        } else {
+          if (lastQueryNonWritableViewSourceExpressionMap == null) {
+            lastQueryNonWritableViewSourceExpressionMap = new HashMap<>();
+          }
+          List<Expression> sourceExpressionsOfNonWritableView =
+              searchSourceExpressions(lastQuerySourceExpression);
+          lastQueryNonWritableViewSourceExpressionMap.put(
+              lastQuerySourceExpression, sourceExpressionsOfNonWritableView);
+          sourceExpressions.addAll(sourceExpressionsOfNonWritableView);
         }
-        sourceExpressions.add(sourceExpression);
       }
     }
+
     analysis.setSourceExpressions(sourceExpressions);
+    analysis.setLastQueryBaseExpressions(lastQueryBaseExpressions);
+    analysis.setLastQueryNonWritableViewSourceExpressionMap(
+        lastQueryNonWritableViewSourceExpressionMap);
   }
 
   private void updateSchemaTreeByViews(Analysis analysis, ISchemaTree 
originSchemaTree) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/ILastCacheContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/ILastCacheContainer.java
index 20f5eb711db..3ada99d8ebf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/ILastCacheContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/ILastCacheContainer.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.tsfile.read.TimeValuePair;
 /** this interface declares the operations of LastCache data */
 public interface ILastCacheContainer {
 
-  // get lastCache of monad timseries
+  // get lastCache of monad timeseries
   TimeValuePair getCachedLast();
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index fbd80dbe22f..ee7472df6fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -71,6 +71,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
@@ -106,11 +107,14 @@ import org.apache.commons.lang.Validate;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -121,7 +125,10 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static 
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.DEVICE;
 import static 
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpression;
 import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_VALUE;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.MAX_TIME;
 
 public class LogicalPlanBuilder {
 
@@ -214,16 +221,13 @@ public class LogicalPlanBuilder {
   }
 
   public LogicalPlanBuilder planLast(
-      Set<Expression> sourceExpressions, Filter globalTimeFilter, Ordering 
timeseriesOrdering) {
-    List<PlanNode> sourceNodeList = new ArrayList<>();
+      Analysis analysis, Ordering timeseriesOrdering, ZoneId zoneId) {
+    Set<String> deviceAlignedSet = new HashSet<>();
+    Set<String> deviceExistViewSet = new HashSet<>();
+    // <Device, <Measurement, Expression>>
+    Map<String, Map<String, Expression>> outputPathToSourceExpressionMap = new 
LinkedHashMap<>();
 
-    Map<String, Boolean> deviceAlignedMap = new HashMap<>();
-    Map<String, Boolean> deviceExistViewMap = new HashMap<>();
-    Map<String, Map<String, Expression>> outputPathToSourceExpressionMap =
-        timeseriesOrdering != null
-            ? new TreeMap<>(timeseriesOrdering.getStringComparator())
-            : new LinkedHashMap<>();
-    for (Expression sourceExpression : sourceExpressions) {
+    for (Expression sourceExpression : analysis.getLastQueryBaseExpressions()) 
{
       MeasurementPath outputPath =
           (MeasurementPath)
               (sourceExpression.isViewExpression()
@@ -238,46 +242,47 @@ public class LogicalPlanBuilder {
                       ? new TreeMap<>(timeseriesOrdering.getStringComparator())
                       : new LinkedHashMap<>())
           .put(outputPath.getMeasurement(), sourceExpression);
-      if (!deviceAlignedMap.containsKey(outputDevice)) {
-        deviceAlignedMap.put(outputDevice, outputPath.isUnderAlignedEntity());
+      if (outputPath.isUnderAlignedEntity()) {
+        deviceAlignedSet.add(outputDevice);
+      }
+      if (sourceExpression.isViewExpression()) {
+        deviceExistViewSet.add(outputDevice);
       }
-      deviceExistViewMap.put(
-          outputDevice,
-          deviceExistViewMap.getOrDefault(outputDevice, false)
-              || sourceExpression.isViewExpression());
     }
 
+    List<PlanNode> sourceNodeList = new ArrayList<>();
     for (Map.Entry<String, Map<String, Expression>> 
deviceMeasurementExpressionEntry :
         outputPathToSourceExpressionMap.entrySet()) {
       String outputDevice = deviceMeasurementExpressionEntry.getKey();
-      if (deviceExistViewMap.get(outputDevice)) {
+      Map<String, Expression> measurementToExpressionsOfDevice =
+          deviceMeasurementExpressionEntry.getValue();
+      if (deviceExistViewSet.contains(outputDevice)) {
         // exist view
-        for (Expression sourceExpression : 
deviceMeasurementExpressionEntry.getValue().values()) {
+        for (Expression sourceExpression : 
measurementToExpressionsOfDevice.values()) {
           MeasurementPath selectedPath =
               (MeasurementPath) ((TimeSeriesOperand) 
sourceExpression).getPath();
+          String outputViewPath =
+              sourceExpression.isViewExpression()
+                  ? sourceExpression.getViewPath().getFullPath()
+                  : null;
+
           if (selectedPath.isUnderAlignedEntity()) { // aligned series
             sourceNodeList.add(
                 new AlignedLastQueryScanNode(
                     context.getQueryId().genPlanNodeId(),
                     new AlignedPath(selectedPath),
-                    sourceExpression.isViewExpression()
-                        ? sourceExpression.getViewPath().getFullPath()
-                        : null));
+                    outputViewPath));
           } else { // non-aligned series
             sourceNodeList.add(
                 new LastQueryScanNode(
-                    context.getQueryId().genPlanNodeId(),
-                    selectedPath,
-                    sourceExpression.isViewExpression()
-                        ? sourceExpression.getViewPath().getFullPath()
-                        : null));
+                    context.getQueryId().genPlanNodeId(), selectedPath, 
outputViewPath));
           }
         }
       } else {
-        if (deviceAlignedMap.get(outputDevice)) {
+        if (deviceAlignedSet.contains(outputDevice)) {
           // aligned series
           List<MeasurementPath> measurementPaths =
-              deviceMeasurementExpressionEntry.getValue().values().stream()
+              measurementToExpressionsOfDevice.values().stream()
                   .map(expression -> (MeasurementPath) ((TimeSeriesOperand) 
expression).getPath())
                   .collect(Collectors.toList());
           AlignedPath alignedPath = new 
AlignedPath(measurementPaths.get(0).getDevicePath());
@@ -289,7 +294,7 @@ public class LogicalPlanBuilder {
                   context.getQueryId().genPlanNodeId(), alignedPath, null));
         } else {
           // non-aligned series
-          for (Expression sourceExpression : 
deviceMeasurementExpressionEntry.getValue().values()) {
+          for (Expression sourceExpression : 
measurementToExpressionsOfDevice.values()) {
             MeasurementPath selectedPath =
                 (MeasurementPath) ((TimeSeriesOperand) 
sourceExpression).getPath();
             sourceNodeList.add(
@@ -299,12 +304,35 @@ public class LogicalPlanBuilder {
       }
     }
 
+    processLastQueryTransformNode(analysis, sourceNodeList, zoneId);
+
+    if (timeseriesOrdering != null) {
+      sourceNodeList.sort(
+          Comparator.comparing(
+              child -> {
+                String sortKey = "";
+                if (child instanceof LastQueryScanNode) {
+                  sortKey = ((LastQueryScanNode) 
child).getOutputSymbolForSort();
+                } else if (child instanceof AlignedLastQueryScanNode) {
+                  sortKey = ((AlignedLastQueryScanNode) 
child).getOutputSymbolForSort();
+                } else if (child instanceof LastQueryTransformNode) {
+                  sortKey = ((LastQueryTransformNode) 
child).getOutputSymbolForSort();
+                }
+                return sortKey;
+              }));
+      if (timeseriesOrdering.equals(Ordering.DESC)) {
+        Collections.reverse(sourceNodeList);
+      }
+    }
+
     this.root =
         new LastQueryNode(
             context.getQueryId().genPlanNodeId(),
             sourceNodeList,
-            globalTimeFilter,
-            timeseriesOrdering);
+            analysis.getGlobalTimeFilter(),
+            timeseriesOrdering,
+            analysis.getLastQueryNonWritableViewSourceExpressionMap() != null);
+
     ColumnHeaderConstant.lastQueryColumnHeaders.forEach(
         columnHeader ->
             context
@@ -314,6 +342,56 @@ public class LogicalPlanBuilder {
     return this;
   }
 
+  private void processLastQueryTransformNode(
+      Analysis analysis, List<PlanNode> sourceNodeList, ZoneId zoneId) {
+    if (analysis.getLastQueryNonWritableViewSourceExpressionMap() == null) {
+      return;
+    }
+
+    for (Map.Entry<Expression, List<Expression>> entry :
+        analysis.getLastQueryNonWritableViewSourceExpressionMap().entrySet()) {
+      Expression expression = entry.getKey();
+      Set<Expression> sourceExpressions = new 
LinkedHashSet<>(entry.getValue());
+      Set<Expression> sourceTransformExpressions = 
Collections.singleton(expression);
+      FunctionExpression maxTimeAgg =
+          new FunctionExpression(
+              MAX_TIME, new LinkedHashMap<>(), 
Collections.singletonList(expression));
+      FunctionExpression lastValueAgg =
+          new FunctionExpression(
+              LAST_VALUE, new LinkedHashMap<>(), 
Collections.singletonList(expression));
+      analyzeExpression(analysis, expression);
+      analyzeExpression(analysis, maxTimeAgg);
+      analyzeExpression(analysis, lastValueAgg);
+
+      LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, 
context);
+      planBuilder =
+          planBuilder
+              .planRawDataSource(
+                  sourceExpressions,
+                  Ordering.DESC,
+                  analysis.getGlobalTimeFilter(),
+                  analysis.isLastLevelUseWildcard())
+              .planWhereAndSourceTransform(
+                  null, sourceTransformExpressions, false, zoneId, 
Ordering.DESC)
+              .planAggregation(
+                  new LinkedHashSet<>(Arrays.asList(maxTimeAgg, lastValueAgg)),
+                  null,
+                  analysis.getGroupByTimeParameter(),
+                  analysis.getGroupByParameter(),
+                  false,
+                  AggregationStep.SINGLE,
+                  Ordering.DESC);
+
+      LastQueryTransformNode transformNode =
+          new LastQueryTransformNode(
+              context.getQueryId().genPlanNodeId(),
+              planBuilder.getRoot(),
+              expression.getViewPath().getFullPath(),
+              analysis.getType(expression).toString());
+      sourceNodeList.add(transformNode);
+    }
+  }
+
   public LogicalPlanBuilder planAggregationSource(
       AggregationStep curStep,
       Ordering scanOrder,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index d5180aacf46..9a2d72dda40 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -92,6 +92,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -127,9 +128,9 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
       planBuilder =
           planBuilder
               .planLast(
-                  analysis.getSourceExpressions(),
-                  analysis.getGlobalTimeFilter(),
-                  analysis.getTimeseriesOrderingForLastQuery())
+                  analysis,
+                  analysis.getTimeseriesOrderingForLastQuery(),
+                  queryStatement.getSelectComponent().getZoneId())
               .planOffset(queryStatement.getRowOffset())
               .planLimit(queryStatement.getRowLimit());
 
@@ -593,13 +594,14 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
                 showTimeSeriesStatement.isPrefixPath(),
                 analysis.getRelatedTemplateInfo())
             .planSchemaQueryMerge(showTimeSeriesStatement.isOrderByHeat());
+
     // show latest timeseries
     if (showTimeSeriesStatement.isOrderByHeat()
         && null != analysis.getDataPartitionInfo()
         && 0 != analysis.getDataPartitionInfo().getDataPartitionMap().size()) {
       PlanNode lastPlanNode =
           new LogicalPlanBuilder(analysis, context)
-              .planLast(analysis.getSourceExpressions(), 
analysis.getGlobalTimeFilter(), null)
+              .planLast(analysis, null, ZoneId.systemDefault())
               .getRoot();
       planBuilder = planBuilder.planSchemaQueryOrderByHeat(lastPlanNode);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 3585b301e10..628d8ba879a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -92,13 +92,13 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.Mul
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.TimeComparator;
-import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.AbstractUpdateLastCacheOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.AlignedUpdateLastCacheOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.AlignedUpdateViewPathLastCacheOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryCollectOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryMergeOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQuerySortOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryTransformOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryUtil;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.UpdateLastCacheOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.UpdateViewPathLastCacheOperator;
@@ -179,6 +179,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
@@ -2020,8 +2021,8 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
   public Operator visitLastQueryScan(LastQueryScanNode node, 
LocalExecutionPlanContext context) {
     PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
     TimeValuePair timeValuePair = null;
+    context.dataNodeQueryContext.lock();
     try {
-      context.dataNodeQueryContext.lock();
       if (!context.dataNodeQueryContext.unCached(seriesPath)) {
         timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
         if (timeValuePair == null) {
@@ -2279,16 +2280,14 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
 
   @Override
   public Operator visitLastQuery(LastQueryNode node, LocalExecutionPlanContext 
context) {
-
     context.setLastQueryTimeFilter(node.getTimeFilter());
     
context.setNeedUpdateLastCache(LastQueryUtil.needUpdateCache(node.getTimeFilter()));
     
context.setNeedUpdateNullEntry(LastQueryUtil.needUpdateNullEntry(node.getTimeFilter()));
 
-    List<AbstractUpdateLastCacheOperator> operatorList =
+    List<Operator> operatorList =
         node.getChildren().stream()
             .map(child -> child.accept(this, context))
             .filter(Objects::nonNull)
-            .map(o -> (AbstractUpdateLastCacheOperator) o)
             .collect(Collectors.toList());
 
     List<Pair<TimeValuePair, Binary>> cachedLastValueAndPathList =
@@ -2390,6 +2389,23 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     return new LastQueryCollectOperator(operatorContext, children);
   }
 
+  @Override
+  public Operator visitLastQueryTransform(
+      LastQueryTransformNode node, LocalExecutionPlanContext context) {
+    Operator operator = node.getChild().accept(this, context);
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                LastQueryCollectOperator.class.getSimpleName());
+
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+    return new LastQueryTransformOperator(
+        node.getViewPath(), node.getDataType(), operatorContext, operator);
+  }
+
   private Map<String, List<InputLocation>> makeLayout(PlanNode node) {
     Map<String, List<InputLocation>> outputMappings = new LinkedHashMap<>();
     int tsBlockIndex = 0;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
index 12b083c9af9..b8920cffbc7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWin
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
@@ -128,19 +129,33 @@ public class SubPlanTypeExtractor {
 
     @Override
     public Void visitLastQuery(LastQueryNode node, Void context) {
+      if (node.isContainsLastTransformNode()) {
+        return visitPlan(node, context);
+      }
       return null;
     }
 
     @Override
     public Void visitLastQueryMerge(LastQueryMergeNode node, Void context) {
+      if (node.isContainsLastTransformNode()) {
+        return visitPlan(node, context);
+      }
       return null;
     }
 
     @Override
     public Void visitLastQueryCollect(LastQueryCollectNode node, Void context) 
{
+      if (node.isContainsLastTransformNode()) {
+        return visitPlan(node, context);
+      }
       return null;
     }
 
+    @Override
+    public Void visitLastQueryTransform(LastQueryTransformNode node, Void 
context) {
+      return visitPlan(node, context);
+    }
+
     // end region PlanNode of last read
 
     private void updateTypeProviderByAggregationDescriptor(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
index 87efdf508b7..ce6b25c26dc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
@@ -41,7 +41,6 @@ public class DistributionPlanContext {
   protected DistributionPlanContext(MPPQueryContext queryContext) {
     this.isRoot = true;
     this.queryContext = queryContext;
-    this.forceAddParent = false;
   }
 
   protected DistributionPlanContext copy() {
@@ -53,8 +52,8 @@ public class DistributionPlanContext {
     return this;
   }
 
-  protected void setForceAddParent(boolean forceAddParent) {
-    this.forceAddParent = forceAddParent;
+  protected void setForceAddParent() {
+    this.forceAddParent = true;
   }
 
   public void setOneSeriesInMultiRegion(boolean oneSeriesInMultiRegion) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
index 2f14ffcd112..f0e016a5e52 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
@@ -81,11 +81,7 @@ public class DistributionPlanner {
   public PlanNode addExchangeNode(PlanNode root) {
     ExchangeNodeAdder adder = new ExchangeNodeAdder(this.analysis);
     NodeGroupContext nodeGroupContext =
-        new NodeGroupContext(
-            context,
-            analysis.getStatement() instanceof QueryStatement
-                && (((QueryStatement) 
analysis.getStatement()).isAlignByDevice()),
-            root);
+        new NodeGroupContext(context, analysis.getStatement(), root);
     PlanNode newRoot = adder.visit(root, nodeGroupContext);
     adjustUpStream(newRoot, nodeGroupContext);
     return newRoot;
@@ -196,7 +192,7 @@ public class DistributionPlanner {
     List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
     // Only execute this step for READ operation
     if (context.getQueryType() == QueryType.READ) {
-      SetSinkForRootInstance(subPlan, fragmentInstances);
+      setSinkForRootInstance(subPlan, fragmentInstances);
     }
     return new DistributedQueryPlan(
         logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), 
fragmentInstances);
@@ -213,7 +209,7 @@ public class DistributionPlanner {
   }
 
   // TODO: (xingtanzjr) Maybe we should handle ResultNode in LogicalPlanner ?
-  public void SetSinkForRootInstance(SubPlan subPlan, List<FragmentInstance> 
instances) {
+  public void setSinkForRootInstance(SubPlan subPlan, List<FragmentInstance> 
instances) {
     FragmentInstance rootInstance = null;
     for (FragmentInstance instance : instances) {
       if 
(instance.getFragment().getId().equals(subPlan.getPlanFragment().getId())) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index b031ca9947e..779f101a52b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -50,6 +50,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
@@ -58,8 +59,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggre
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -106,11 +105,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
 
   private PlanNode internalVisitSchemaMerge(
       AbstractSchemaMergeNode node, NodeGroupContext context) {
-    node.getChildren()
-        .forEach(
-            child -> {
-              visit(child, context);
-            });
+    node.getChildren().forEach(child -> visit(child, context));
     NodeDistribution nodeDistribution =
         new NodeDistribution(NodeDistributionType.DIFFERENT_FROM_ALL_CHILDREN);
     PlanNode newNode = node.clone();
@@ -229,6 +224,11 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     return processMultiChildNode(node, context);
   }
 
+  @Override
+  public PlanNode visitLastQueryTransform(LastQueryTransformNode node, 
NodeGroupContext context) {
+    return processOneChildNode(node, context);
+  }
+
   @Override
   public PlanNode visitTimeJoin(TimeJoinNode node, NodeGroupContext context) {
     return processMultiChildNode(node, context);
@@ -281,46 +281,38 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     }
 
     MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
-    List<PlanNode> visitedChildren = new ArrayList<>();
-    node.getChildren()
-        .forEach(
-            child -> {
-              visitedChildren.add(visit(child, context));
-            });
+    List<PlanNode> visitedChildren =
+        node.getChildren().stream()
+            .map(child -> visit(child, context))
+            .collect(Collectors.toList());
 
     TRegionReplicaSet dataRegion;
-    NodeDistributionType distributionType;
+    boolean isChildrenDistributionSame = 
nodeDistributionIsSame(visitedChildren, context);
+    NodeDistributionType distributionType =
+        isChildrenDistributionSame
+            ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
+            : NodeDistributionType.SAME_WITH_SOME_CHILD;
     if (context.isAlignByDevice()) {
       // For align by device,
       // if dataRegions of children are the same, we set child's dataRegion to 
this node,
       // else we set the selected mostlyUsedDataRegion to this node
-      boolean inSame = nodeDistributionIsSame(visitedChildren, context);
       dataRegion =
-          inSame
+          isChildrenDistributionSame
               ? 
context.getNodeDistribution(visitedChildren.get(0).getPlanNodeId()).region
               : context.getMostlyUsedDataRegion();
       context.putNodeDistribution(
-          newNode.getPlanNodeId(),
-          new NodeDistribution(
-              inSame
-                  ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
-                  : NodeDistributionType.SAME_WITH_SOME_CHILD,
-              dataRegion));
+          newNode.getPlanNodeId(), new NodeDistribution(distributionType, 
dataRegion));
     } else {
       // TODO For align by time, we keep old logic for now
       dataRegion = calculateDataRegionByChildren(visitedChildren, context);
-      distributionType =
-          nodeDistributionIsSame(visitedChildren, context)
-              ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
-              : NodeDistributionType.SAME_WITH_SOME_CHILD;
       context.putNodeDistribution(
           newNode.getPlanNodeId(), new NodeDistribution(distributionType, 
dataRegion));
+    }
 
-      // If the distributionType of all the children are same, no ExchangeNode 
need to be added.
-      if (distributionType == NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
-        newNode.setChildren(visitedChildren);
-        return newNode;
-      }
+    // If the distributionType of all the children are same, no ExchangeNode 
need to be added.
+    if (distributionType == NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
+      newNode.setChildren(visitedChildren);
+      return newNode;
     }
 
     // Otherwise, we need to add ExchangeNode for the child whose DataRegion 
is different from the
@@ -381,6 +373,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
 
   private TRegionReplicaSet calculateDataRegionByChildren(
       List<PlanNode> children, NodeGroupContext context) {
+
     // Step 1: calculate the count of children group by DataRegion.
     Map<TRegionReplicaSet, Long> groupByRegion =
         children.stream()
@@ -397,16 +390,27 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
                       return region;
                     },
                     Collectors.counting()));
-    if (groupByRegion.entrySet().size() == 1) {
-      return groupByRegion.entrySet().iterator().next().getKey();
+
+    if (groupByRegion.size() == 1) {
+      return groupByRegion.keySet().iterator().next();
     }
+
     // Step 2: return the RegionReplicaSet with max count
-    return Collections.max(
-            groupByRegion.entrySet().stream()
-                .filter(e -> e.getKey() != DataPartition.NOT_ASSIGNED)
-                .collect(Collectors.toList()),
-            Map.Entry.comparingByValue())
-        .getKey();
+    long maxRegionCount = -1;
+    TRegionReplicaSet result = null;
+    for (Map.Entry<TRegionReplicaSet, Long> entry : groupByRegion.entrySet()) {
+      if (DataPartition.NOT_ASSIGNED.equals(entry.getKey())) {
+        continue;
+      }
+      if (entry.getKey().equals(context.getMostlyUsedDataRegion())) {
+        return entry.getKey();
+      }
+      if (entry.getValue() > maxRegionCount) {
+        maxRegionCount = entry.getValue();
+        result = entry.getKey();
+      }
+    }
+    return result;
   }
 
   private TRegionReplicaSet calculateSchemaRegionByChildren(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
index 14799292714..9ea36e7994c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
@@ -25,23 +25,31 @@ import 
org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 public class NodeGroupContext {
+
   protected final MPPQueryContext queryContext;
   private final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
-  private final boolean isAlignByDevice;
-  private final TRegionReplicaSet mostlyUsedDataRegion;
+  private boolean isAlignByDevice;
+  private TRegionReplicaSet mostlyUsedDataRegion;
   protected boolean hasExchangeNode;
 
-  public NodeGroupContext(MPPQueryContext queryContext, boolean 
isAlignByDevice, PlanNode root) {
+  public NodeGroupContext(MPPQueryContext queryContext, Statement statement, 
PlanNode root) {
     this.queryContext = queryContext;
     this.nodeDistributionMap = new HashMap<>();
-    this.isAlignByDevice = isAlignByDevice;
-    this.mostlyUsedDataRegion = isAlignByDevice ? 
getMostlyUsedDataRegion(root) : null;
+    if (statement instanceof QueryStatement) {
+      this.isAlignByDevice = ((QueryStatement) statement).isAlignByDevice();
+      this.mostlyUsedDataRegion = getMostlyUsedDataRegion(root);
+    } else if (statement instanceof ShowTimeSeriesStatement) {
+      this.mostlyUsedDataRegion = getMostlyUsedDataRegion(root);
+    }
     this.hasExchangeNode = false;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 2bf79e2a113..ff24932bca6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -38,6 +38,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNo
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastSeriesSourceNode;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -183,7 +184,9 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
         });
 
     if (analysis.getStatement() instanceof QueryStatement
-        || analysis.getStatement() instanceof ShowQueriesStatement) {
+        || analysis.getStatement() instanceof ShowQueriesStatement
+        || (analysis.getStatement() instanceof ShowTimeSeriesStatement
+            && ((ShowTimeSeriesStatement) 
analysis.getStatement()).isOrderByHeat())) {
       
fragmentInstance.getFragment().generateTypeProvider(queryContext.getTypeProvider());
     }
     instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 5fd9c4972a6..ec481a98566 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -63,7 +63,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDe
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -480,7 +479,10 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
       LastQueryScanNode node, DistributionPlanContext context) {
     LastQueryNode mergeNode =
         new LastQueryNode(
-            context.queryContext.getQueryId().genPlanNodeId(), 
node.getPartitionTimeFilter(), null);
+            context.queryContext.getQueryId().genPlanNodeId(),
+            node.getPartitionTimeFilter(),
+            null,
+            false);
     return processRawSeriesScan(node, context, mergeNode);
   }
 
@@ -489,7 +491,10 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
       AlignedLastQueryScanNode node, DistributionPlanContext context) {
     LastQueryNode mergeNode =
         new LastQueryNode(
-            context.queryContext.getQueryId().genPlanNodeId(), 
node.getPartitionTimeFilter(), null);
+            context.queryContext.getQueryId().genPlanNodeId(),
+            node.getPartitionTimeFilter(),
+            null,
+            false);
     return processRawSeriesScan(node, context, mergeNode);
   }
 
@@ -619,8 +624,8 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
   public List<PlanNode> visitLastQuery(LastQueryNode node, 
DistributionPlanContext context) {
     // For last query, we need to keep every FI's root node is 
LastQueryMergeNode. So we
     // force every region group have a parent node even if there is only 1 
child for it.
-    context.setForceAddParent(true);
-    PlanNode root = processRawMultiChildNode(node, context);
+    context.setForceAddParent();
+    PlanNode root = processRawMultiChildNode(node, context, true);
     if (context.queryMultiRegion) {
       PlanNode newRoot = genLastQueryRootNode(node, context);
       // add sort op for each if we add LastQueryMergeNode as root
@@ -678,9 +683,10 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     // if the series is from multi regions or order by clause only refer to 
timeseries, use
     // LastQueryMergeNode
     if (context.oneSeriesInMultiRegion || node.needOrderByTimeseries()) {
-      return new LastQueryMergeNode(id, node.getTimeseriesOrdering());
+      return new LastQueryMergeNode(
+          id, node.getTimeseriesOrdering(), 
node.isContainsLastTransformNode());
     }
-    return new LastQueryCollectNode(id);
+    return new LastQueryCollectNode(id, node.isContainsLastTransformNode());
   }
 
   @Override
@@ -691,11 +697,11 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     if (containsAggregationSource(node)) {
       return planAggregationWithTimeJoin(node, context);
     }
-    return Collections.singletonList(processRawMultiChildNode(node, context));
+    return Collections.singletonList(processRawMultiChildNode(node, context, 
false));
   }
 
   private PlanNode processRawMultiChildNode(
-      MultiChildProcessNode node, DistributionPlanContext context) {
+      MultiChildProcessNode node, DistributionPlanContext context, boolean 
isLast) {
     MultiChildProcessNode root = (MultiChildProcessNode) node.clone();
     // Step 1: Get all source nodes. For the node which is not source, add it 
as the child of
     // current TimeJoinNode
@@ -704,9 +710,10 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
       if (child instanceof SeriesSourceNode) {
         // If the child is SeriesScanNode, we need to check whether this node 
should be seperated
         // into several splits.
-        SeriesSourceNode handle = (SeriesSourceNode) child;
+        SeriesSourceNode sourceNode = (SeriesSourceNode) child;
         List<TRegionReplicaSet> dataDistribution =
-            analysis.getPartitionInfo(handle.getPartitionPath(), 
handle.getPartitionTimeFilter());
+            analysis.getPartitionInfo(
+                sourceNode.getPartitionPath(), 
sourceNode.getPartitionTimeFilter());
         if (dataDistribution.size() > 1) {
           // We mark this variable to `true` if there is some series which is 
distributed in multi
           // DataRegions
@@ -715,17 +722,17 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
         // If the size of dataDistribution is m, this SeriesScanNode should be 
seperated into m
         // SeriesScanNode.
         for (TRegionReplicaSet dataRegion : dataDistribution) {
-          SeriesSourceNode split = (SeriesSourceNode) handle.clone();
+          SeriesSourceNode split = (SeriesSourceNode) sourceNode.clone();
           
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
           split.setRegionReplicaSet(dataRegion);
           sources.add(split);
         }
       }
     }
+
     // Step 2: For the source nodes, group them by the DataRegion.
     Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
         
sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
-
     if (sourceGroup.size() > 1) {
       context.setQueryMultiRegion(true);
     }
@@ -734,35 +741,31 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     // and make the
     // new TimeJoinNode as the child of current TimeJoinNode
     // TODO: (xingtanzjr) optimize the procedure here to remove duplicated 
TimeJoinNode
-    final boolean[] addParent = {false};
-    sourceGroup.forEach(
-        (dataRegion, seriesScanNodes) -> {
-          if (seriesScanNodes.size() == 1 && !context.forceAddParent) {
-            root.addChild(seriesScanNodes.get(0));
-          } else {
-            // If there is only one RegionGroup here, we should not create new 
MultiChildNode as the
-            // parent.
-            // If the size of RegionGroup is larger than 1, we need to 
consider the value of
-            // `forceAddParent`.
-            // If `forceAddParent` is true, we should not create new 
MultiChildNode as the parent,
-            // either.
-            // At last, we can use the parameter `addParent[0]` to judge 
whether to create new
-            // MultiChildNode.
-            boolean appendToRootDirectly =
-                sourceGroup.size() == 1 || (!addParent[0] && 
!context.forceAddParent);
-            if (appendToRootDirectly) {
-              seriesScanNodes.forEach(root::addChild);
-              addParent[0] = true;
-            } else {
-              // We clone a TimeJoinNode from root to make the params to be 
consistent.
-              // But we need to assign a new ID to it
-              MultiChildProcessNode parentOfGroup = (MultiChildProcessNode) 
root.clone();
-              
parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-              seriesScanNodes.forEach(parentOfGroup::addChild);
-              root.addChild(parentOfGroup);
-            }
-          }
-        });
+    boolean addParent = false;
+    for (List<SourceNode> seriesScanNodes : sourceGroup.values()) {
+      if (seriesScanNodes.size() == 1 && (!context.forceAddParent || !isLast)) 
{
+        root.addChild(seriesScanNodes.get(0));
+        continue;
+      }
+      // If size of RegionGroup = 1, we should not create new MultiChildNode 
as the parent.
+      // If size of RegionGroup > 1, we need to consider the value of 
`forceAddParent`.
+      // If `forceAddParent` is true, we should not create new MultiChildNode 
as the parent, either.
+      // At last, we can use the parameter `addParent` to judge whether to 
create new
+      // MultiChildNode.
+      boolean appendToRootDirectly =
+          sourceGroup.size() == 1 || (!addParent && !context.forceAddParent);
+      if (appendToRootDirectly) {
+        seriesScanNodes.forEach(root::addChild);
+        addParent = true;
+      } else {
+        // We clone a TimeJoinNode from root to make the params to be 
consistent.
+        // But we need to assign a new ID to it
+        MultiChildProcessNode parentOfGroup = (MultiChildProcessNode) 
root.clone();
+        
parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+        seriesScanNodes.forEach(parentOfGroup::addChild);
+        root.addChild(parentOfGroup);
+      }
+    }
 
     // Process the other children which are not SeriesSourceNode
     for (PlanNode child : node.getChildren()) {
@@ -777,10 +780,6 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     return root;
   }
 
-  private boolean isAggregationQuery() {
-    return ((QueryStatement) analysis.getStatement()).isAggregationQuery();
-  }
-
   private boolean containsAggregationSource(TimeJoinNode node) {
     for (PlanNode child : node.getChildren()) {
       if (child instanceof SeriesAggregationScanNode
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
index 38741d91e0e..0bceb53eb5c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
@@ -60,10 +60,7 @@ public class SubPlan {
   public List<PlanFragment> getPlanFragmentList() {
     List<PlanFragment> result = new ArrayList<>();
     result.add(this.planFragment);
-    this.children.forEach(
-        child -> {
-          result.addAll(child.getPlanFragmentList());
-        });
+    this.children.forEach(child -> result.addAll(child.getPlanFragmentList()));
     return result;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index ec63414d2f7..820a41352aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -45,6 +45,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
@@ -423,6 +424,14 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
     return render(node, boxValue, context);
   }
 
+  @Override
+  public List<String> visitLastQueryTransform(LastQueryTransformNode node, 
GraphContext context) {
+    List<String> boxValue = new ArrayList<>();
+    boxValue.add(String.format("LastQueryTransform-%s", 
node.getPlanNodeId().getId()));
+    boxValue.add(String.format("ViewPath: %s", node.getViewPath()));
+    return render(node, boxValue, context);
+  }
+
   @Override
   public List<String> visitHorizontallyConcat(HorizontallyConcatNode node, 
GraphContext context) {
     List<String> boxValue = new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 2364aef9d7e..98be8a79971 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -78,6 +78,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
@@ -180,7 +181,8 @@ public enum PlanNodeType {
   LOGICAL_VIEW_SCHEMA_SCAN((short) 77),
   ALTER_LOGICAL_VIEW((short) 78),
   PIPE_ENRICHED_INSERT((short) 79),
-  ;
+  FORECAST((short) 80),
+  LAST_QUERY_TRANSFORM((short) 81);
 
   public static final int BYTES = Short.BYTES;
 
@@ -385,6 +387,8 @@ public enum PlanNodeType {
         return AlterLogicalViewNode.deserialize(buffer);
       case 79:
         return PipeEnrichedInsertNode.deserialize(buffer);
+      case 81:
+        return LastQueryTransformNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index e6eb22fc5a2..965e83beb15 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -78,6 +78,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
@@ -234,6 +235,10 @@ public abstract class PlanVisitor<R, C> {
     return visitMultiChildProcess(node, context);
   }
 
+  public R visitLastQueryTransform(LastQueryTransformNode node, C context) {
+    return visitSingleChildProcess(node, context);
+  }
+
   public R visitMergeSort(MergeSortNode node, C context) {
     return visitMultiChildProcess(node, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
index e38b851e8dd..fbba5c6d336 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
@@ -34,8 +34,11 @@ import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.Last
 
 public class LastQueryCollectNode extends MultiChildProcessNode {
 
-  public LastQueryCollectNode(PlanNodeId id) {
+  private boolean containsLastTransformNode;
+
+  public LastQueryCollectNode(PlanNodeId id, boolean 
containsLastTransformNode) {
     super(id);
+    this.containsLastTransformNode = containsLastTransformNode;
   }
 
   public LastQueryCollectNode(PlanNodeId id, List<PlanNode> children) {
@@ -54,7 +57,7 @@ public class LastQueryCollectNode extends 
MultiChildProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new LastQueryCollectNode(getPlanNodeId());
+    return new LastQueryCollectNode(getPlanNodeId(), 
containsLastTransformNode);
   }
 
   @Override
@@ -99,11 +102,19 @@ public class LastQueryCollectNode extends 
MultiChildProcessNode {
 
   public static LastQueryCollectNode deserialize(ByteBuffer byteBuffer) {
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new LastQueryCollectNode(planNodeId);
+    return new LastQueryCollectNode(planNodeId, false);
   }
 
   @Override
   public void setChildren(List<PlanNode> children) {
     this.children = children;
   }
+
+  public boolean isContainsLastTransformNode() {
+    return this.containsLastTransformNode;
+  }
+
+  public void setContainsLastQueryTransformNode() {
+    this.containsLastTransformNode = true;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java
index 3c240f48903..7bdc4b245b0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java
@@ -40,14 +40,14 @@ public class LastQueryMergeNode extends 
MultiChildProcessNode {
   // The size of this list is 2 and the first SortItem in this list has higher 
priority.
   private final Ordering timeseriesOrdering;
 
-  public LastQueryMergeNode(PlanNodeId id, Ordering timeseriesOrdering) {
-    super(id);
-    this.timeseriesOrdering = timeseriesOrdering;
-  }
+  // if children contains LastTransformNode
+  private boolean containsLastTransformNode;
 
-  public LastQueryMergeNode(PlanNodeId id, List<PlanNode> children, Ordering 
timeseriesOrdering) {
-    super(id, children);
+  public LastQueryMergeNode(
+      PlanNodeId id, Ordering timeseriesOrdering, boolean 
containsLastTransformNode) {
+    super(id);
     this.timeseriesOrdering = timeseriesOrdering;
+    this.containsLastTransformNode = containsLastTransformNode;
   }
 
   @Override
@@ -62,7 +62,7 @@ public class LastQueryMergeNode extends MultiChildProcessNode 
{
 
   @Override
   public PlanNode clone() {
-    return new LastQueryMergeNode(getPlanNodeId(), timeseriesOrdering);
+    return new LastQueryMergeNode(getPlanNodeId(), timeseriesOrdering, 
containsLastTransformNode);
   }
 
   @Override
@@ -135,7 +135,7 @@ public class LastQueryMergeNode extends 
MultiChildProcessNode {
       timeseriesOrdering = 
Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
     }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new LastQueryMergeNode(planNodeId, timeseriesOrdering);
+    return new LastQueryMergeNode(planNodeId, timeseriesOrdering, false);
   }
 
   @Override
@@ -146,4 +146,12 @@ public class LastQueryMergeNode extends 
MultiChildProcessNode {
   public Ordering getTimeseriesOrdering() {
     return timeseriesOrdering;
   }
+
+  public boolean isContainsLastTransformNode() {
+    return this.containsLastTransformNode;
+  }
+
+  public void setContainsNonWritableView() {
+    this.containsLastTransformNode = true;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java
index a47810a0076..0d6ade61aad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java
@@ -46,20 +46,30 @@ public class LastQueryNode extends MultiChildProcessNode {
   // which is set to null if there is no need to sort
   private Ordering timeseriesOrdering;
 
-  public LastQueryNode(PlanNodeId id, Filter timeFilter, @Nullable Ordering 
timeseriesOrdering) {
+  // if children contains LastTransformNode, this variable is only used in 
distribute plan
+  private boolean containsLastTransformNode;
+
+  public LastQueryNode(
+      PlanNodeId id,
+      Filter timeFilter,
+      @Nullable Ordering timeseriesOrdering,
+      boolean containsLastTransformNode) {
     super(id);
     this.timeFilter = timeFilter;
     this.timeseriesOrdering = timeseriesOrdering;
+    this.containsLastTransformNode = containsLastTransformNode;
   }
 
   public LastQueryNode(
       PlanNodeId id,
       List<PlanNode> children,
       Filter timeFilter,
-      @Nullable Ordering timeseriesOrdering) {
+      @Nullable Ordering timeseriesOrdering,
+      boolean containsLastTransformNode) {
     super(id, children);
     this.timeFilter = timeFilter;
     this.timeseriesOrdering = timeseriesOrdering;
+    this.containsLastTransformNode = containsLastTransformNode;
   }
 
   @Override
@@ -74,7 +84,8 @@ public class LastQueryNode extends MultiChildProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new LastQueryNode(getPlanNodeId(), timeFilter, timeseriesOrdering);
+    return new LastQueryNode(
+        getPlanNodeId(), timeFilter, timeseriesOrdering, 
containsLastTransformNode);
   }
 
   @Override
@@ -163,7 +174,7 @@ public class LastQueryNode extends MultiChildProcessNode {
       timeseriesOrdering = 
Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
     }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new LastQueryNode(planNodeId, timeFilter, timeseriesOrdering);
+    return new LastQueryNode(planNodeId, timeFilter, timeseriesOrdering, 
false);
   }
 
   @Override
@@ -184,6 +195,14 @@ public class LastQueryNode extends MultiChildProcessNode {
     this.timeseriesOrdering = timeseriesOrdering;
   }
 
+  public boolean isContainsLastTransformNode() {
+    return this.containsLastTransformNode;
+  }
+
+  public void setContainsLastTransformNode() {
+    this.containsLastTransformNode = true;
+  }
+
   public boolean needOrderByTimeseries() {
     return timeseriesOrdering != null;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java
similarity index 52%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java
index e38b851e8dd..108189259e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java
@@ -16,13 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last;
 
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -32,78 +34,96 @@ import java.util.Objects;
 
 import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
 
-public class LastQueryCollectNode extends MultiChildProcessNode {
+public class LastQueryTransformNode extends SingleChildProcessNode {
+
+  private final String viewPath;
 
-  public LastQueryCollectNode(PlanNodeId id) {
+  private final String dataType;
+
+  public LastQueryTransformNode(PlanNodeId id, String viewPath, String 
dataType) {
     super(id);
+    this.viewPath = viewPath;
+    this.dataType = dataType;
   }
 
-  public LastQueryCollectNode(PlanNodeId id, List<PlanNode> children) {
-    super(id, children);
+  public LastQueryTransformNode(PlanNodeId id, PlanNode aggNode, String 
viewPath, String dataType) {
+    super(id, aggNode);
+    this.viewPath = viewPath;
+    this.dataType = dataType;
   }
 
   @Override
-  public List<PlanNode> getChildren() {
-    return children;
+  public PlanNode clone() {
+    return new LastQueryTransformNode(getPlanNodeId(), viewPath, dataType);
   }
 
   @Override
-  public void addChild(PlanNode child) {
-    children.add(child);
+  public List<String> getOutputColumnNames() {
+    return LAST_QUERY_HEADER_COLUMNS;
   }
 
   @Override
-  public PlanNode clone() {
-    return new LastQueryCollectNode(getPlanNodeId());
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.LAST_QUERY_TRANSFORM.serialize(byteBuffer);
+    ReadWriteIOUtils.write(viewPath, byteBuffer);
+    ReadWriteIOUtils.write(dataType, byteBuffer);
   }
 
   @Override
-  public int allowedChildCount() {
-    return CHILD_COUNT_NO_LIMIT;
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.LAST_QUERY_TRANSFORM.serialize(stream);
+    ReadWriteIOUtils.write(viewPath, stream);
+    ReadWriteIOUtils.write(dataType, stream);
   }
 
-  @Override
-  public List<String> getOutputColumnNames() {
-    return LAST_QUERY_HEADER_COLUMNS;
+  public static LastQueryTransformNode deserialize(ByteBuffer byteBuffer) {
+    String viewPath = ReadWriteIOUtils.readString(byteBuffer);
+    String dataType = ReadWriteIOUtils.readString(byteBuffer);
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new LastQueryTransformNode(planNodeId, viewPath, dataType);
   }
 
   @Override
-  public String toString() {
-    return String.format("LastQueryCollectNode-%s", this.getPlanNodeId());
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitLastQueryTransform(this, context);
   }
 
   @Override
   public boolean equals(Object o) {
-    return super.equals(o);
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    LastQueryTransformNode that = (LastQueryTransformNode) o;
+    return viewPath.equals(that.viewPath) && dataType.equals(that.dataType);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode());
-  }
-
-  @Override
-  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-    return visitor.visitLastQueryCollect(this, context);
+    return Objects.hash(super.hashCode(), viewPath, dataType);
   }
 
   @Override
-  protected void serializeAttributes(ByteBuffer byteBuffer) {
-    PlanNodeType.LAST_QUERY_COLLECT.serialize(byteBuffer);
+  public String toString() {
+    return String.format(
+        "LastQueryTransformNode-%s:[ViewPath: %s, DataType: %s]",
+        this.getPlanNodeId(), viewPath, dataType);
   }
 
-  @Override
-  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
-    PlanNodeType.LAST_QUERY_COLLECT.serialize(stream);
+  public String getViewPath() {
+    return this.viewPath;
   }
 
-  public static LastQueryCollectNode deserialize(ByteBuffer byteBuffer) {
-    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new LastQueryCollectNode(planNodeId);
+  public String getDataType() {
+    return this.dataType;
   }
 
-  @Override
-  public void setChildren(List<PlanNode> children) {
-    this.children = children;
+  public String getOutputSymbolForSort() {
+    return viewPath;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java
index 7b7b5e8b500..b2197609e84 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/sink/IdentitySinkNode.java
@@ -87,6 +87,11 @@ public class IdentitySinkNode extends MultiChildrenSinkNode {
     }
   }
 
+  @Override
+  public String toString() {
+    return String.format("IdentitySinkNode-%s", this.getPlanNodeId());
+  }
+
   public static IdentitySinkNode deserialize(ByteBuffer byteBuffer) {
     int size = ReadWriteIOUtils.readInt(byteBuffer);
     List<DownStreamChannelLocation> downStreamChannelLocationList = new 
ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
index 28cecd5f4e1..638c1c81e3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java
@@ -55,10 +55,10 @@ public class AlignedSeriesScanNode extends SeriesSourceNode 
{
   // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
   private Ordering scanOrder = Ordering.ASC;
 
-  // time filter for current series, could be null if doesn't exist
+  // time filter for current series, could be null if it doesn't exist
   @Nullable private Filter timeFilter;
 
-  // value filter for current series, could be null if doesn't exist
+  // value filter for current series, could be null if it doesn't exist
   @Nullable private Filter valueFilter;
 
   // Limit for result set. The default value is -1, which means no limit
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
index 539d90bc84d..0231def964d 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
@@ -48,7 +48,6 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.process.fill.IFill;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.fill.linear.LinearFill;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.HorizontallyConcatOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.RowBasedTimeJoinOperator;
-import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.AbstractUpdateLastCacheOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryCollectOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryMergeOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryOperator;
@@ -346,7 +345,7 @@ public class OperatorMemoryTest {
   public void lastQueryOperatorTest() {
     TsBlockBuilder builder = Mockito.mock(TsBlockBuilder.class);
     Mockito.when(builder.getRetainedSizeInBytes()).thenReturn(1024L);
-    List<AbstractUpdateLastCacheOperator> children = new ArrayList<>(4);
+    List<Operator> children = new ArrayList<>(4);
     long expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
     for (int i = 0; i < 4; i++) {
       UpdateLastCacheOperator child = 
Mockito.mock(UpdateLastCacheOperator.class);
@@ -376,7 +375,7 @@ public class OperatorMemoryTest {
     TsBlock tsBlock = Mockito.mock(TsBlock.class);
     Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(16 * 1024L);
     Mockito.when(tsBlock.getPositionCount()).thenReturn(16);
-    List<AbstractUpdateLastCacheOperator> children = new ArrayList<>(4);
+    List<Operator> children = new ArrayList<>(4);
 
     for (int i = 0; i < 4; i++) {
       UpdateLastCacheOperator child = 
Mockito.mock(UpdateLastCacheOperator.class);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java
index b36feab3cff..87ffe9baca7 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java
@@ -143,7 +143,7 @@ public class QueryLogicalPlanUtil {
 
     LastQueryNode lastQueryNode =
         new LastQueryNode(
-            queryId.genPlanNodeId(), sourceNodeList, TimeFilter.gt(100), 
Ordering.ASC);
+            queryId.genPlanNodeId(), sourceNodeList, TimeFilter.gt(100), 
Ordering.ASC, false);
 
     querySQLs.add(sql);
     sqlToPlanMap.put(sql, lastQueryNode);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/LastQueryTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/LastQueryTest.java
index db875ef9b38..13d1586d475 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/LastQueryTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/LastQueryTest.java
@@ -209,7 +209,7 @@ public class LastQueryTest {
     }
 
     PlanNode root =
-        new LastQueryNode(context.getQueryId().genPlanNodeId(), 
sourceNodeList, null, null);
+        new LastQueryNode(context.getQueryId().genPlanNodeId(), 
sourceNodeList, null, null, false);
     return new LogicalQueryPlan(context, root);
   }
 }

Reply via email to