This is an automated email from the ASF dual-hosted git repository.

lancelly pushed a commit to branch support_exists_and_correlate
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fb6c5e51a63add52421457cff4843d83264f7c27
Author: lancelly <[email protected]>
AuthorDate: Fri Jan 10 16:42:24 2025 +0800

    add AssignUniqueId
---
 .../operator/process/AssignUniqueIdOperator.java   | 142 +++++
 .../plan/planner/TableOperatorGenerator.java       |  15 +
 .../plan/planner/plan/node/PlanGraphPrinter.java   |  10 +
 .../plan/planner/plan/node/PlanNodeType.java       |   1 +
 .../plan/planner/plan/node/PlanVisitor.java        |   5 +
 .../iterative/rule/PruneAssignUniqueIdColumns.java |  47 ++
 .../rule/TransformExistsApplyToCorrelatedJoin.java | 215 +++++++
 .../relational/planner/node/AssignUniqueId.java    |  97 +++
 .../plan/relational/planner/node/Patterns.java     |   8 +-
 .../optimizations/LogicalOptimizeFactory.java      |   2 +
 .../optimizations/PlanNodeDecorrelator.java        | 699 +++++++++++++++++++++
 .../optimizations/PushPredicateIntoTableScan.java  |  15 +
 ...mQuantifiedComparisonApplyToCorrelatedJoin.java |  27 +-
 .../optimizations/UnaliasSymbolReferences.java     |  13 +
 .../relational/planner/optimizations/Util.java     |  19 +
 .../planner/assertions/AssignUniqueIdMatcher.java  |  49 ++
 .../planner/assertions/PlanMatchPattern.java       |   6 +
 17 files changed, 1344 insertions(+), 26 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AssignUniqueIdOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AssignUniqueIdOperator.java
new file mode 100644
index 00000000000..97a64293805
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AssignUniqueIdOperator.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTaskId;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.column.LongColumnBuilder;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+
+public class AssignUniqueIdOperator implements ProcessOperator {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(AssignUniqueIdOperator.class);
+
+  private static final long ROW_IDS_PER_REQUEST = 1L << 20L;
+  private static final long MAX_ROW_ID = 1L << 40L;
+
+  private final OperatorContext operatorContext;
+  private final Operator child;
+
+  private final AtomicLong rowIdPool = new AtomicLong();
+  private final long uniqueValueMask;
+
+  private long rowIdCounter;
+  private long maxRowIdCounterValue;
+
+  public AssignUniqueIdOperator(OperatorContext operatorContext, Operator 
child) {
+    this.operatorContext = operatorContext;
+    this.child = child;
+
+    DriverTaskId id = operatorContext.getDriverContext().getDriverTaskID();
+    this.uniqueValueMask =
+        (((long) id.getFragmentId().getId()) << 54) | (((long) 
id.getPipelineId()) << 40);
+    requestValues();
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    TsBlock tsBlock = child.next();
+    if (tsBlock == null || tsBlock.isEmpty()) {
+      return null;
+    }
+    return tsBlock.appendValueColumns(new Column[] 
{generateIdColumn(tsBlock.getPositionCount())});
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    return child.hasNext();
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (child != null) {
+      child.close();
+    }
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    return child.isFinished();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize()
+        + (long) Long.SIZE * 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return child.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE
+        + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(child)
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  private Column generateIdColumn(int positionCount) {
+    LongColumnBuilder columnBuilder = new LongColumnBuilder(null, 
positionCount);
+    for (int currentPosition = 0; currentPosition < positionCount; 
currentPosition++) {
+      if (rowIdCounter >= maxRowIdCounterValue) {
+        requestValues();
+      }
+      long rowId = rowIdCounter++;
+      verify((rowId & uniqueValueMask) == 0, "RowId and uniqueValue mask 
overlaps");
+      columnBuilder.writeLong(uniqueValueMask | rowId);
+    }
+    return columnBuilder.build();
+  }
+
+  private void requestValues() {
+    rowIdCounter = rowIdPool.getAndAdd(ROW_IDS_PER_REQUEST);
+    maxRowIdCounterValue = Math.min(rowIdCounter + ROW_IDS_PER_REQUEST, 
MAX_ROW_ID);
+    checkState(rowIdCounter < MAX_ROW_ID, "Unique row id exceeds a limit: %s", 
MAX_ROW_ID);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 9ca7ddda112..1d99562e5ed 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -129,6 +129,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode;
@@ -1580,6 +1581,20 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     return new EnforceSingleRowOperator(operatorContext, child);
   }
 
+  @Override
+  public Operator visitAssignUniqueId(AssignUniqueId node, 
LocalExecutionPlanContext context) {
+    Operator child = node.getChild().accept(this, context);
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                EnforceSingleRowOperator.class.getSimpleName());
+
+    return new EnforceSingleRowOperator(operatorContext, child);
+  }
+
   @Override
   public Operator visitCountMerge(
       final CountSchemaMergeNode node, final LocalExecutionPlanContext 
context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index 442d59f8be3..c61ab511168 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -66,6 +66,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAg
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.IntoPathDescriptor;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode;
@@ -947,6 +948,15 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
     return render(node, boxValue, context);
   }
 
+  @Override
+  public List<String> visitAssignUniqueId(AssignUniqueId node, GraphContext 
context) {
+    List<String> boxValue = new ArrayList<>();
+    boxValue.add(String.format("AssignUniqueId-%s", 
node.getPlanNodeId().getId()));
+    boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols()));
+    boxValue.add(String.format("IdColumnSymbol: %s", node.getIdColumn()));
+    return render(node, boxValue, context);
+  }
+
   private String printRegion(TRegionReplicaSet regionReplicaSet) {
     return String.format(
         "Partition: %s",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 3e86d189e1e..0dd5788970f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -290,6 +290,7 @@ public enum PlanNodeType {
   TREE_ALIGNED_DEVICE_VIEW_SCAN_NODE((short) 1023),
   TREE_NONALIGNED_DEVICE_VIEW_SCAN_NODE((short) 1024),
   TABLE_SEMI_JOIN_NODE((short) 1025),
+  TABLE_ASSIGN_UNIQUE_ID((short) 1026),
 
   RELATIONAL_INSERT_TABLET((short) 2000),
   RELATIONAL_INSERT_ROW((short) 2001),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index d04c57e77fb..bf0099d17fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -654,6 +654,11 @@ public abstract class PlanVisitor<R, C> {
     return visitTwoChildProcess(node, context);
   }
 
+  public R visitAssignUniqueId(
+      
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId 
node, C context) {
+    return visitSingleChildProcess(node, context);
+  }
+
   public R visitEnforceSingleRow(
       
org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode
 node,
       C context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneAssignUniqueIdColumns.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneAssignUniqueIdColumns.java
new file mode 100644
index 00000000000..a5c675b5fc8
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneAssignUniqueIdColumns.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId;
+
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.restrictChildOutputs;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.assignUniqueId;
+
+public class PruneAssignUniqueIdColumns extends 
ProjectOffPushDownRule<AssignUniqueId> {
+  public PruneAssignUniqueIdColumns() {
+    super(assignUniqueId());
+  }
+
+  @Override
+  protected Optional<PlanNode> pushDownProjectOff(
+      Context context, AssignUniqueId assignUniqueId, Set<Symbol> 
referencedOutputs) {
+    // remove unused AssignUniqueId node
+    if (!referencedOutputs.contains(assignUniqueId.getIdColumn())) {
+      return Optional.of(assignUniqueId.getChild());
+    }
+
+    return restrictChildOutputs(context.getIdAllocator(), assignUniqueId, 
referencedOutputs);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/TransformExistsApplyToCorrelatedJoin.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/TransformExistsApplyToCorrelatedJoin.java
new file mode 100644
index 00000000000..86f1ec96a69
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/TransformExistsApplyToCorrelatedJoin.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.ResolvedFunction;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanNodeDecorrelator;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CoalesceExpression;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.tsfile.read.common.type.LongType;
+
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.globalAggregation;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleAggregation;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode.JoinType.INNER;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode.JoinType.LEFT;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.applyNode;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Util.getResolvedBuiltInAggregateFunction;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.GREATER_THAN;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureTranslator.toSqlType;
+import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN;
+
+/**
+ * EXISTS is modeled as (if correlated predicates are equality comparisons):
+ *
+ * <pre>
+ *     - Project(exists := COALESCE(subqueryTrue, false))
+ *       - CorrelatedJoin(LEFT)
+ *         - input
+ *         - Project(subqueryTrue := true)
+ *           - Limit(count=1)
+ *             - subquery
+ * </pre>
+ *
+ * or:
+ *
+ * <pre>
+ *     - CorrelatedJoin(LEFT)
+ *       - input
+ *       - Project($0 > 0)
+ *         - Aggregation(COUNT(*))
+ *           - subquery
+ * </pre>
+ *
+ * otherwise
+ */
+public class TransformExistsApplyToCorrelatedJoin implements Rule<ApplyNode> {
+  private static final Pattern<ApplyNode> PATTERN = applyNode();
+
+  private final PlannerContext plannerContext;
+
+  public TransformExistsApplyToCorrelatedJoin(PlannerContext plannerContext) {
+    this.plannerContext = requireNonNull(plannerContext, "plannerContext is 
null");
+  }
+
+  @Override
+  public Pattern<ApplyNode> getPattern() {
+    return PATTERN;
+  }
+
+  @Override
+  public Result apply(ApplyNode parent, Captures captures, Context context) {
+    if (parent.getSubqueryAssignments().size() != 1) {
+      return Result.empty();
+    }
+
+    ApplyNode.SetExpression expression = 
getOnlyElement(parent.getSubqueryAssignments().values());
+    if (!(expression instanceof ApplyNode.Exists)) {
+      return Result.empty();
+    }
+
+    /*
+    Empty correlation list indicates that the subquery contains no correlation 
symbols from the
+    immediate outer scope. The subquery might be either not correlated at all, 
or correlated with
+    symbols from further outer scope.
+    Currently, the two cases are indistinguishable.
+    To support the latter case, the ApplyNode with empty correlation list is 
rewritten to default
+    aggregation, which is inefficient in the rare case of uncorrelated EXISTS 
subquery,
+    but currently allows to successfully decorrelate a correlated EXISTS 
subquery.
+
+    TODO: remove this condition when exploratory optimizer is implemented or 
support for decorrelating joins is implemented in PlanNodeDecorrelator
+    */
+    if (parent.getCorrelation().isEmpty()) {
+      return Result.ofPlanNode(rewriteToDefaultAggregation(parent, context));
+    }
+
+    Optional<PlanNode> nonDefaultAggregation = 
rewriteToNonDefaultAggregation(parent, context);
+    return nonDefaultAggregation
+        .map(Result::ofPlanNode)
+        .orElseGet(() -> Result.ofPlanNode(rewriteToDefaultAggregation(parent, 
context)));
+  }
+
+  private Optional<PlanNode> rewriteToNonDefaultAggregation(ApplyNode 
applyNode, Context context) {
+    checkState(
+        applyNode.getSubquery().getOutputSymbols().isEmpty(),
+        "Expected subquery output symbols to be pruned");
+
+    Symbol subqueryTrue = 
context.getSymbolAllocator().newSymbol("subqueryTrue", BOOLEAN);
+
+    PlanNode subquery =
+        new ProjectNode(
+            context.getIdAllocator().genPlanNodeId(),
+            new LimitNode(
+                context.getIdAllocator().genPlanNodeId(),
+                applyNode.getSubquery(),
+                1L,
+                Optional.empty()),
+            Assignments.of(subqueryTrue, TRUE_LITERAL));
+
+    PlanNodeDecorrelator decorrelator =
+        new PlanNodeDecorrelator(plannerContext, context.getSymbolAllocator(), 
context.getLookup());
+    if (!decorrelator.decorrelateFilters(subquery, 
applyNode.getCorrelation()).isPresent()) {
+      return Optional.empty();
+    }
+
+    Symbol exists = 
getOnlyElement(applyNode.getSubqueryAssignments().keySet());
+    Assignments.Builder assignments =
+        Assignments.builder()
+            .putIdentities(applyNode.getInput().getOutputSymbols())
+            .put(
+                exists,
+                new CoalesceExpression(
+                    ImmutableList.of(
+                        subqueryTrue.toSymbolReference(), 
BooleanLiteral.FALSE_LITERAL)));
+
+    return Optional.of(
+        new ProjectNode(
+            context.getIdAllocator().genPlanNodeId(),
+            new CorrelatedJoinNode(
+                applyNode.getPlanNodeId(),
+                applyNode.getInput(),
+                subquery,
+                applyNode.getCorrelation(),
+                LEFT,
+                TRUE_LITERAL,
+                applyNode.getOriginSubquery()),
+            assignments.build()));
+  }
+
+  private PlanNode rewriteToDefaultAggregation(ApplyNode applyNode, Context 
context) {
+    ResolvedFunction countFunction =
+        getResolvedBuiltInAggregateFunction(
+            plannerContext.getMetadata(), "count", ImmutableList.of());
+    Symbol count = context.getSymbolAllocator().newSymbol("count", 
LongType.getInstance());
+    Symbol exists = 
getOnlyElement(applyNode.getSubqueryAssignments().keySet());
+
+    return new CorrelatedJoinNode(
+        applyNode.getPlanNodeId(),
+        applyNode.getInput(),
+        new ProjectNode(
+            context.getIdAllocator().genPlanNodeId(),
+            singleAggregation(
+                context.getIdAllocator().genPlanNodeId(),
+                applyNode.getSubquery(),
+                ImmutableMap.of(
+                    count,
+                    new AggregationNode.Aggregation(
+                        countFunction,
+                        ImmutableList.of(),
+                        false,
+                        Optional.empty(),
+                        Optional.empty(),
+                        Optional.empty())),
+                globalAggregation()),
+            Assignments.of(
+                exists,
+                new ComparisonExpression(
+                    GREATER_THAN,
+                    count.toSymbolReference(),
+                    new Cast(new LongLiteral("0"), 
toSqlType(LongType.getInstance()))))),
+        applyNode.getCorrelation(),
+        INNER,
+        TRUE_LITERAL,
+        applyNode.getOriginSubquery());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AssignUniqueId.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AssignUniqueId.java
new file mode 100644
index 00000000000..3528d8d3649
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AssignUniqueId.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class AssignUniqueId extends SingleChildProcessNode {
+  private final Symbol idColumn;
+
+  public AssignUniqueId(PlanNodeId id, PlanNode child, Symbol idColumn) {
+    super(id);
+    this.child = child;
+    this.idColumn = requireNonNull(idColumn, "idColumn is null");
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitAssignUniqueId(this, context);
+  }
+
+  @Override
+  public List<Symbol> getOutputSymbols() {
+    return 
ImmutableList.<Symbol>builder().addAll(child.getOutputSymbols()).add(idColumn).build();
+  }
+
+  @Override
+  public PlanNode clone() {
+    // clone without child
+    return new AssignUniqueId(id, null, idColumn);
+  }
+
+  @Override
+  public PlanNode replaceChildren(List<PlanNode> newChildren) {
+    checkArgument(newChildren.size() == 1, "expected newChildren to contain 1 
node");
+    return new AssignUniqueId(getPlanNodeId(), 
Iterables.getOnlyElement(newChildren), idColumn);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.TABLE_ASSIGN_UNIQUE_ID.serialize(byteBuffer);
+    Symbol.serialize(idColumn, byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.TABLE_ASSIGN_UNIQUE_ID.serialize(stream);
+    Symbol.serialize(idColumn, stream);
+  }
+
+  public static AssignUniqueId deserialize(ByteBuffer byteBuffer) {
+    Symbol idColumn = Symbol.deserialize(byteBuffer);
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new AssignUniqueId(planNodeId, null, idColumn);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    throw new UnsupportedOperationException();
+  }
+
+  public Symbol getIdColumn() {
+    return idColumn;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
index 604e2b06a3a..f676fcd0ca5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
@@ -42,10 +42,10 @@ public final class Patterns {
     return typeOf(AggregationNode.class);
   }
 
-  //  public static Pattern<AssignUniqueId> assignUniqueId()
-  //  {
-  //      return typeOf(AssignUniqueId.class);
-  //  }
+  public static Pattern<AssignUniqueId> assignUniqueId() {
+    return typeOf(AssignUniqueId.class);
+  }
+
   //
   //  public static Pattern<GroupIdNode> groupId()
   //  {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
index b3766383c18..de2b245bfd2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
@@ -35,6 +35,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Pr
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneApplyColumns;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneApplyCorrelation;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneApplySourceColumns;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneAssignUniqueIdColumns;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneCorrelatedJoinColumns;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneCorrelatedJoinCorrelation;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneDistinctAggregation;
@@ -86,6 +87,7 @@ public class LogicalOptimizeFactory {
             new PruneApplyColumns(),
             new PruneApplyCorrelation(),
             new PruneApplySourceColumns(),
+            new PruneAssignUniqueIdColumns(),
             new PruneCorrelatedJoinColumns(),
             new PruneCorrelatedJoinCorrelation(),
             new PruneEnforceSingleRowColumns(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PlanNodeDecorrelator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PlanNodeDecorrelator.java
new file mode 100644
index 00000000000..8527f74191c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PlanNodeDecorrelator.java
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
+
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.GroupReference;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Lookup;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference;
+import org.apache.iotdb.db.queryengine.plan.relational.type.TypeCoercion;
+import org.apache.iotdb.db.queryengine.plan.relational.type.TypeManager;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.DeterminismEvaluator.isDeterministic;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.and;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.combineConjuncts;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.extractConjuncts;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleAggregation;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleGroupingSet;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.SymbolMapper.symbolMapper;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.EQUAL;
+
+public class PlanNodeDecorrelator {
+  private final PlannerContext plannerContext;
+  private final SymbolAllocator symbolAllocator;
+  private final Lookup lookup;
+  private final TypeCoercion typeCoercion;
+
+  public PlanNodeDecorrelator(
+      PlannerContext plannerContext, SymbolAllocator symbolAllocator, Lookup 
lookup) {
+    this.plannerContext = requireNonNull(plannerContext, "plannerContext is 
null");
+    this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is 
null");
+    this.lookup = requireNonNull(lookup, "lookup is null");
+    this.typeCoercion = new 
TypeCoercion(plannerContext.getTypeManager()::getType);
+  }
+
+  public Optional<DecorrelatedNode> decorrelateFilters(PlanNode node, 
List<Symbol> correlation) {
+    if (correlation.isEmpty()) {
+      return Optional.of(new DecorrelatedNode(ImmutableList.of(), node));
+    }
+
+    Optional<DecorrelationResult> decorrelationResultOptional =
+        node.accept(new DecorrelatingVisitor(plannerContext.getTypeManager(), 
correlation), null);
+    return decorrelationResultOptional.flatMap(
+        decorrelationResult ->
+            decorrelatedNode(
+                decorrelationResult.correlatedPredicates, 
decorrelationResult.node, correlation));
+  }
+
+  private class DecorrelatingVisitor extends 
PlanVisitor<Optional<DecorrelationResult>, Void> {
+    private final TypeManager typeManager;
+    private final List<Symbol> correlation;
+
+    DecorrelatingVisitor(TypeManager typeManager, List<Symbol> correlation) {
+      this.typeManager = requireNonNull(typeManager, "typeManager is null");
+      this.correlation = requireNonNull(correlation, "correlation is null");
+    }
+
+    @Override
+    public Optional<DecorrelationResult> visitPlan(PlanNode node, Void 
context) {
+      if (containsCorrelation(node, correlation)) {
+        return Optional.empty();
+      }
+      return Optional.of(
+          new DecorrelationResult(
+              node,
+              ImmutableSet.of(),
+              ImmutableList.of(),
+              ImmutableMultimap.of(),
+              ImmutableSet.of(),
+              false));
+    }
+
+    @Override
+    public Optional<DecorrelationResult> visitGroupReference(GroupReference 
node, Void context) {
+      return lookup.resolve(node).accept(this, null);
+    }
+
+    @Override
+    public Optional<DecorrelationResult> visitFilter(FilterNode node, Void 
context) {
+      Optional<DecorrelationResult> childDecorrelationResultOptional =
+          Optional.of(
+              new DecorrelationResult(
+                  node.getChild(),
+                  ImmutableSet.of(),
+                  ImmutableList.of(),
+                  ImmutableMultimap.of(),
+                  ImmutableSet.of(),
+                  false));
+
+      // try to decorrelate filters down the tree
+      if (containsCorrelation(node.getChild(), correlation)) {
+        childDecorrelationResultOptional = node.getChild().accept(this, null);
+      }
+
+      if (!childDecorrelationResultOptional.isPresent()) {
+        return Optional.empty();
+      }
+
+      Expression predicate = node.getPredicate();
+      Map<Boolean, List<Expression>> predicates =
+          extractConjuncts(predicate).stream()
+              
.collect(Collectors.partitioningBy(DecorrelatingVisitor.this::isCorrelated));
+      List<Expression> correlatedPredicates = 
ImmutableList.copyOf(predicates.get(true));
+      List<Expression> uncorrelatedPredicates = 
ImmutableList.copyOf(predicates.get(false));
+
+      DecorrelationResult childDecorrelationResult = 
childDecorrelationResultOptional.get();
+      FilterNode newFilterNode =
+          new FilterNode(
+              node.getPlanNodeId(),
+              childDecorrelationResult.node,
+              combineConjuncts(uncorrelatedPredicates));
+
+      Set<Symbol> symbolsToPropagate =
+          Sets.difference(
+              SymbolsExtractor.extractUnique(correlatedPredicates),
+              ImmutableSet.copyOf(correlation));
+      return Optional.of(
+          new DecorrelationResult(
+              newFilterNode,
+              Sets.union(childDecorrelationResult.symbolsToPropagate, 
symbolsToPropagate),
+              ImmutableList.<Expression>builder()
+                  .addAll(childDecorrelationResult.correlatedPredicates)
+                  .addAll(correlatedPredicates)
+                  .build(),
+              ImmutableMultimap.<Symbol, Symbol>builder()
+                  .putAll(childDecorrelationResult.correlatedSymbolsMapping)
+                  
.putAll(extractCorrelatedSymbolsMapping(correlatedPredicates))
+                  .build(),
+              ImmutableSet.<Symbol>builder()
+                  .addAll(childDecorrelationResult.constantSymbols)
+                  .addAll(extractConstantSymbols(correlatedPredicates))
+                  .build(),
+              childDecorrelationResult.atMostSingleRow));
+    }
+
+    @Override
+    public Optional<DecorrelationResult> visitLimit(LimitNode node, Void 
context) {
+      if (node.getCount() == 0 || node.isWithTies()) {
+        return Optional.empty();
+      }
+
+      Optional<DecorrelationResult> childDecorrelationResultOptional =
+          node.getChild().accept(this, null);
+      if (!childDecorrelationResultOptional.isPresent()) {
+        return Optional.empty();
+      }
+
+      DecorrelationResult childDecorrelationResult = 
childDecorrelationResultOptional.get();
+      if (childDecorrelationResult.atMostSingleRow) {
+        return childDecorrelationResultOptional;
+      }
+
+      if (node.getCount() == 1) {
+        return rewriteLimitWithRowCountOne(childDecorrelationResult, 
node.getPlanNodeId());
+      }
+      throw new SemanticException(
+          "Decorrelation for LIMIT with row count greater than 1 is not 
supported yet");
+      // return 
rewriteLimitWithRowCountGreaterThanOne(childDecorrelationResult, node);
+    }
+
+    // TODO Limit (1) could be decorrelated by the method 
rewriteLimitWithRowCountGreaterThanOne()
+    // as well.
+    // The current decorrelation method for Limit (1) cannot deal with 
subqueries outputting other
+    // symbols
+    // than constants.
+    //
+    // An example query that is currently not supported:
+    // SELECT (
+    //      SELECT a+b
+    //      FROM (VALUES (1, 2), (1, 2)) inner_relation(a, b)
+    //      WHERE a=x
+    //      LIMIT 1)
+    // FROM (VALUES (1)) outer_relation(x)
+    //
+    // Switching the decorrelation method would change the way that queries 
with EXISTS are
+    // executed,
+    // and thus it needs benchmarking.
+    private Optional<DecorrelationResult> rewriteLimitWithRowCountOne(
+        DecorrelationResult childDecorrelationResult, PlanNodeId nodeId) {
+      PlanNode decorrelatedChildNode = childDecorrelationResult.node;
+      Set<Symbol> constantSymbols = 
childDecorrelationResult.getConstantSymbols();
+
+      if (constantSymbols.isEmpty()
+          || 
!constantSymbols.containsAll(decorrelatedChildNode.getOutputSymbols())) {
+        return Optional.empty();
+      }
+
+      // rewrite Limit to aggregation on constant symbols
+      AggregationNode aggregationNode =
+          singleAggregation(
+              nodeId,
+              decorrelatedChildNode,
+              ImmutableMap.of(),
+              singleGroupingSet(decorrelatedChildNode.getOutputSymbols()));
+
+      return Optional.of(
+          new DecorrelationResult(
+              aggregationNode,
+              childDecorrelationResult.symbolsToPropagate,
+              childDecorrelationResult.correlatedPredicates,
+              childDecorrelationResult.correlatedSymbolsMapping,
+              childDecorrelationResult.constantSymbols,
+              true));
+    }
+
+    /*
+    private Optional<DecorrelationResult> 
rewriteLimitWithRowCountGreaterThanOne(
+        DecorrelationResult childDecorrelationResult, LimitNode node) {
+      PlanNode decorrelatedChildNode = childDecorrelationResult.node;
+
+      // no rewrite needed (no symbols to partition by)
+      if (childDecorrelationResult.symbolsToPropagate.isEmpty()) {
+        return Optional.of(
+            new DecorrelationResult(
+                node.replaceChildren(ImmutableList.of(decorrelatedChildNode)),
+                childDecorrelationResult.symbolsToPropagate,
+                childDecorrelationResult.correlatedPredicates,
+                childDecorrelationResult.correlatedSymbolsMapping,
+                childDecorrelationResult.constantSymbols,
+                false));
+      }
+
+      Set<Symbol> constantSymbols = 
childDecorrelationResult.getConstantSymbols();
+      if 
(!constantSymbols.containsAll(childDecorrelationResult.symbolsToPropagate)) {
+        return Optional.empty();
+      }
+
+      // rewrite Limit to RowNumberNode partitioned by constant symbols
+      RowNumberNode rowNumberNode =
+          new RowNumberNode(
+              node.getPlanNodeId(),
+              decorrelatedChildNode,
+              
ImmutableList.copyOf(childDecorrelationResult.symbolsToPropagate),
+              false,
+              symbolAllocator.newSymbol("row_number", BIGINT),
+              Optional.of(toIntExact(node.getCount())),
+              Optional.empty());
+
+      return Optional.of(
+          new DecorrelationResult(
+              rowNumberNode,
+              childDecorrelationResult.symbolsToPropagate,
+              childDecorrelationResult.correlatedPredicates,
+              childDecorrelationResult.correlatedSymbolsMapping,
+              childDecorrelationResult.constantSymbols,
+              false));
+    }*/
+
+    @Override
+    public Optional<DecorrelationResult> visitTopK(TopKNode node, Void 
context) {
+      throw new SemanticException("TopK is not supported in correlated 
subquery for now");
+    }
+
+    /*@Override
+    public Optional<DecorrelationResult> visitTopK(TopKNode node, Void context)
+    {
+        if (node.getCount() == 0) {
+            return Optional.empty();
+        }
+
+        checkState(node.getChildren().size() == 1, "Expected TopKNode to have 
a single child");
+        Optional<DecorrelationResult> childDecorrelationResultOptional = 
node.getChildren().get(0).accept(this, null);
+        if (!childDecorrelationResultOptional.isPresent()) {
+            return Optional.empty();
+        }
+
+        DecorrelationResult childDecorrelationResult = 
childDecorrelationResultOptional.get();
+        if (childDecorrelationResult.atMostSingleRow) {
+            return childDecorrelationResultOptional;
+        }
+
+        PlanNode decorrelatedChildNode = childDecorrelationResult.node;
+        Set<Symbol> constantSymbols = 
childDecorrelationResult.getConstantSymbols();
+        Optional<OrderingScheme> decorrelatedOrderingScheme = 
decorrelateOrderingScheme(node.getOrderingScheme(), constantSymbols);
+
+        // no partitioning needed (no symbols to partition by)
+        if (childDecorrelationResult.symbolsToPropagate.isEmpty()) {
+            return decorrelatedOrderingScheme
+                    .map(orderingScheme -> new DecorrelationResult(
+                            // ordering symbols are present - return 
decorrelated TopNNode
+                            new TopKNode(node.getPlanNodeId(), 
decorrelatedChildNode, node.getCount(), orderingScheme, node.getStep()),
+                            childDecorrelationResult.symbolsToPropagate,
+                            childDecorrelationResult.correlatedPredicates,
+                            childDecorrelationResult.correlatedSymbolsMapping,
+                            childDecorrelationResult.constantSymbols,
+                            node.getCount() == 1))
+                    .or(() -> Optional.of(new DecorrelationResult(
+                            // no ordering symbols are left - convert to 
LimitNode
+                            new LimitNode(node.getPlanNodeId(), 
decorrelatedChildNode, node.getCount(), Optional.empty()),
+                            childDecorrelationResult.symbolsToPropagate,
+                            childDecorrelationResult.correlatedPredicates,
+                            childDecorrelationResult.correlatedSymbolsMapping,
+                            childDecorrelationResult.constantSymbols,
+                            node.getCount() == 1)));
+        }
+
+        if 
(!constantSymbols.containsAll(childDecorrelationResult.symbolsToPropagate)) {
+            return Optional.empty();
+        }
+
+        return decorrelatedOrderingScheme
+                .map(orderingScheme -> {
+                    // ordering symbols are present - rewrite TopN to 
TopNRankingNode partitioned by constant symbols
+                    TopNRankingNode topNRankingNode = new TopNRankingNode(
+                            node.getId(),
+                            decorrelatedChildNode,
+                            new DataOrganizationSpecification(
+                                    
ImmutableList.copyOf(childDecorrelationResult.symbolsToPropagate),
+                                    Optional.of(orderingScheme)),
+                            ROW_NUMBER,
+                            symbolAllocator.newSymbol("ranking", BIGINT),
+                            toIntExact(node.getCount()),
+                            false,
+                            Optional.empty());
+
+                    return Optional.of(new DecorrelationResult(
+                            topNRankingNode,
+                            childDecorrelationResult.symbolsToPropagate,
+                            childDecorrelationResult.correlatedPredicates,
+                            childDecorrelationResult.correlatedSymbolsMapping,
+                            childDecorrelationResult.constantSymbols,
+                            node.getCount() == 1));
+                })
+                .orElseGet(() -> {
+                    // no ordering symbols are left - rewrite TopN to 
RowNumberNode partitioned by constant symbols
+                    RowNumberNode rowNumberNode = new RowNumberNode(
+                            node.getId(),
+                            decorrelatedChildNode,
+                            
ImmutableList.copyOf(childDecorrelationResult.symbolsToPropagate),
+                            false,
+                            symbolAllocator.newSymbol("row_number", BIGINT),
+                            Optional.of(toIntExact(node.getCount())),
+                            Optional.empty());
+
+                    return Optional.of(new DecorrelationResult(
+                            rowNumberNode,
+                            childDecorrelationResult.symbolsToPropagate,
+                            childDecorrelationResult.correlatedPredicates,
+                            childDecorrelationResult.correlatedSymbolsMapping,
+                            childDecorrelationResult.constantSymbols,
+                            node.getCount() == 1));
+                });
+    }
+
+    private Optional<OrderingScheme> decorrelateOrderingScheme(
+        OrderingScheme orderingScheme, Set<Symbol> constantSymbols) {
+      // remove local and remote constant sort symbols from the OrderingScheme
+      ImmutableList.Builder<Symbol> nonConstantOrderBy = 
ImmutableList.builder();
+      ImmutableMap.Builder<Symbol, SortOrder> nonConstantOrderings = 
ImmutableMap.builder();
+      for (Symbol symbol : orderingScheme.getOrderBy()) {
+        if (!constantSymbols.contains(symbol) && 
!correlation.contains(symbol)) {
+          nonConstantOrderBy.add(symbol);
+          nonConstantOrderings.put(symbol, orderingScheme.getOrdering(symbol));
+        }
+      }
+      if (nonConstantOrderBy.build().isEmpty()) {
+        return Optional.empty();
+      }
+      return Optional.of(
+          new OrderingScheme(nonConstantOrderBy.build(), 
nonConstantOrderings.buildOrThrow()));
+    }*/
+
+    @Override
+    public Optional<DecorrelationResult> visitAggregation(AggregationNode 
node, Void context) {
+      // Aggregation with global grouping cannot be converted to aggregation 
grouped on constants.
+      // Theoretically, if there are no constants to group on, the aggregation 
could be successfully
+      // decorrelated.
+      // However, it can only happen when Aggregation's source plan is not 
correlated.
+      // Then:
+      // - either we should not reach here because uncorrelated subplans of 
correlated filters are
+      // not explored,
+      // - or the aggregation contains correlation which is unresolvable. This 
is indicated by
+      // returning Optional.empty().
+      if (node.hasEmptyGroupingSet()) {
+        return Optional.empty();
+      }
+
+      if (node.getGroupingSetCount() != 1) {
+        return Optional.empty();
+      }
+
+      Optional<DecorrelationResult> childDecorrelationResultOptional =
+          node.getChild().accept(this, null);
+      if (!childDecorrelationResultOptional.isPresent()) {
+        return Optional.empty();
+      }
+
+      DecorrelationResult childDecorrelationResult = 
childDecorrelationResultOptional.get();
+      Set<Symbol> constantSymbols = 
childDecorrelationResult.getConstantSymbols();
+
+      AggregationNode decorrelatedAggregation =
+          childDecorrelationResult
+              .getCorrelatedSymbolMapper()
+              .map(node, childDecorrelationResult.node);
+
+      Set<Symbol> groupingKeys = ImmutableSet.copyOf(node.getGroupingKeys());
+      checkState(
+          
ImmutableSet.copyOf(decorrelatedAggregation.getGroupingKeys()).equals(groupingKeys),
+          "grouping keys were correlated");
+      List<Symbol> symbolsToAdd =
+          childDecorrelationResult.symbolsToPropagate.stream()
+              .filter(symbol -> !groupingKeys.contains(symbol))
+              .collect(toImmutableList());
+
+      if (!constantSymbols.containsAll(symbolsToAdd)) {
+        return Optional.empty();
+      }
+
+      AggregationNode newAggregation =
+          AggregationNode.builderFrom(decorrelatedAggregation)
+              .setGroupingSets(
+                  singleGroupingSet(
+                      ImmutableList.<Symbol>builder()
+                          .addAll(node.getGroupingKeys())
+                          .addAll(symbolsToAdd)
+                          .build()))
+              .setPreGroupedSymbols(ImmutableList.of())
+              .build();
+
+      return Optional.of(
+          new DecorrelationResult(
+              newAggregation,
+              childDecorrelationResult.symbolsToPropagate,
+              childDecorrelationResult.correlatedPredicates,
+              childDecorrelationResult.correlatedSymbolsMapping,
+              childDecorrelationResult.constantSymbols,
+              constantSymbols.containsAll(newAggregation.getGroupingKeys())));
+    }
+
+    @Override
+    public Optional<DecorrelationResult> visitProject(ProjectNode node, Void 
context) {
+      Optional<DecorrelationResult> childDecorrelationResultOptional =
+          node.getChild().accept(this, null);
+      if (!childDecorrelationResultOptional.isPresent()) {
+        return Optional.empty();
+      }
+
+      DecorrelationResult childDecorrelationResult = 
childDecorrelationResultOptional.get();
+      Set<Symbol> nodeOutputSymbols = 
ImmutableSet.copyOf(node.getOutputSymbols());
+      List<Symbol> symbolsToAdd =
+          childDecorrelationResult.symbolsToPropagate.stream()
+              .filter(symbol -> !nodeOutputSymbols.contains(symbol))
+              .collect(toImmutableList());
+
+      Assignments assignments =
+          
Assignments.builder().putAll(node.getAssignments()).putIdentities(symbolsToAdd).build();
+
+      return Optional.of(
+          new DecorrelationResult(
+              new ProjectNode(node.getPlanNodeId(), 
childDecorrelationResult.node, assignments),
+              childDecorrelationResult.symbolsToPropagate,
+              childDecorrelationResult.correlatedPredicates,
+              childDecorrelationResult.correlatedSymbolsMapping,
+              childDecorrelationResult.constantSymbols,
+              childDecorrelationResult.atMostSingleRow));
+    }
+
+    private Multimap<Symbol, Symbol> extractCorrelatedSymbolsMapping(
+        List<Expression> correlatedConjuncts) {
+      ImmutableMultimap.Builder<Symbol, Symbol> mapping = 
ImmutableMultimap.builder();
+      for (Expression conjunct : correlatedConjuncts) {
+        if (!(conjunct instanceof ComparisonExpression)) {
+          continue;
+        }
+
+        ComparisonExpression comparison = (ComparisonExpression) conjunct;
+
+        if (!(comparison.getLeft() instanceof SymbolReference
+            && comparison.getRight() instanceof SymbolReference
+            && comparison.getOperator() == EQUAL)) {
+          continue;
+        }
+
+        Symbol left = Symbol.from(comparison.getLeft());
+        Symbol right = Symbol.from(comparison.getRight());
+
+        if (correlation.contains(left) && !correlation.contains(right)) {
+          mapping.put(left, right);
+        }
+
+        if (correlation.contains(right) && !correlation.contains(left)) {
+          mapping.put(right, left);
+        }
+      }
+
+      return mapping.build();
+    }
+
+    private Set<Symbol> extractConstantSymbols(List<Expression> 
correlatedConjuncts) {
+      ImmutableSet.Builder<Symbol> constants = ImmutableSet.builder();
+
+      correlatedConjuncts.stream()
+          .filter(ComparisonExpression.class::isInstance)
+          .map(ComparisonExpression.class::cast)
+          .filter(comparison -> comparison.getOperator() == EQUAL)
+          .forEach(
+              comparison -> {
+                Expression left = comparison.getLeft();
+                Expression right = comparison.getRight();
+
+                if (!isCorrelated(left)
+                    && (left instanceof SymbolReference || 
isSimpleInjectiveCast(left))
+                    && isConstant(right)) {
+                  constants.add(getSymbol(left));
+                }
+
+                if (!isCorrelated(right)
+                    && (right instanceof SymbolReference || 
isSimpleInjectiveCast(right))
+                    && isConstant(left)) {
+                  constants.add(getSymbol(right));
+                }
+              });
+
+      return constants.build();
+    }
+
+    // checks whether the expression is a deterministic combination of 
correlation symbols
+    private boolean isConstant(Expression expression) {
+      return isDeterministic(expression)
+          && ImmutableSet.copyOf(correlation)
+              .containsAll(SymbolsExtractor.extractUnique(expression));
+    }
+
+    // checks whether the expression is an injective cast over a symbol
+    private boolean isSimpleInjectiveCast(Expression expression) {
+      if (!(expression instanceof Cast)) {
+        return false;
+      }
+      Cast cast = (Cast) expression;
+      if (!(cast.getExpression() instanceof SymbolReference)) {
+        return false;
+      }
+      // simply return true for now
+      return true;
+      /*Symbol sourceSymbol = Symbol.from(cast.getExpression());
+
+      Type sourceType = 
symbolAllocator.getTypes().getTableModelType(sourceSymbol);
+      Type targetType = typeManager.getType(toTypeSignature(((Cast) 
expression).getType()));
+
+      return typeCoercion.isInjectiveCoercion(sourceType, targetType);*/
+    }
+
+    private Symbol getSymbol(Expression expression) {
+      if (expression instanceof SymbolReference) {
+        return Symbol.from(expression);
+      }
+      return Symbol.from(((Cast) expression).getExpression());
+    }
+
+    private boolean isCorrelated(Expression expression) {
+      return 
correlation.stream().anyMatch(SymbolsExtractor.extractUnique(expression)::contains);
+    }
+  }
+
+  private static class DecorrelationResult {
+    final PlanNode node;
+    final Set<Symbol> symbolsToPropagate;
+    final List<Expression> correlatedPredicates;
+
+    // mapping from correlated symbols to their uncorrelated equivalence
+    final Multimap<Symbol, Symbol> correlatedSymbolsMapping;
+
+    // local (uncorrelated) symbols known to be constant based on their 
dependency on correlation
+    // symbols
+    // they are derived from the filter predicate, e.g.
+    // a = corr --> a is constant
+    // b = f(corr_1, corr_2, ...) --> b is constant provided that f is 
deterministic
+    // cast(c AS ...) = corr --> c is constant provided that cast is injective
+    final Set<Symbol> constantSymbols;
+
+    // If a subquery has at most single row for any correlation values?
+    final boolean atMostSingleRow;
+
+    DecorrelationResult(
+        PlanNode node,
+        Set<Symbol> symbolsToPropagate,
+        List<Expression> correlatedPredicates,
+        Multimap<Symbol, Symbol> correlatedSymbolsMapping,
+        Set<Symbol> constantSymbols,
+        boolean atMostSingleRow) {
+      this.node = node;
+      this.symbolsToPropagate = symbolsToPropagate;
+      this.correlatedPredicates = correlatedPredicates;
+      this.atMostSingleRow = atMostSingleRow;
+      this.correlatedSymbolsMapping = correlatedSymbolsMapping;
+      this.constantSymbols = constantSymbols;
+      checkState(
+          constantSymbols.containsAll(correlatedSymbolsMapping.values()),
+          "Expected constant symbols to contain all correlated symbols local 
equivalents");
+      checkState(
+          symbolsToPropagate.containsAll(constantSymbols),
+          "Expected symbols to propagate to contain all constant symbols");
+    }
+
+    SymbolMapper getCorrelatedSymbolMapper() {
+      return symbolMapper(
+          correlatedSymbolsMapping.asMap().entrySet().stream()
+              .collect(
+                  toImmutableMap(
+                      Map.Entry::getKey, symbols -> 
Iterables.getLast(symbols.getValue()))));
+    }
+
+    /**
+     * @return constant symbols from a perspective of a subquery
+     */
+    Set<Symbol> getConstantSymbols() {
+      return constantSymbols;
+    }
+  }
+
+  private Optional<DecorrelatedNode> decorrelatedNode(
+      List<Expression> correlatedPredicates, PlanNode node, List<Symbol> 
correlation) {
+    if (containsCorrelation(node, correlation)) {
+      // node is still correlated ; /
+      return Optional.empty();
+    }
+    return Optional.of(new DecorrelatedNode(correlatedPredicates, node));
+  }
+
+  private boolean containsCorrelation(PlanNode node, List<Symbol> correlation) 
{
+    return Sets.union(
+            SymbolsExtractor.extractUnique(node, lookup),
+            SymbolsExtractor.extractOutputSymbols(node, lookup))
+        .stream()
+        .anyMatch(correlation::contains);
+  }
+
+  public static class DecorrelatedNode {
+    private final List<Expression> correlatedPredicates;
+    private final PlanNode node;
+
+    public DecorrelatedNode(List<Expression> correlatedPredicates, PlanNode 
node) {
+      requireNonNull(correlatedPredicates, "correlatedPredicates is null");
+      this.correlatedPredicates = ImmutableList.copyOf(correlatedPredicates);
+      this.node = requireNonNull(node, "node is null");
+    }
+
+    public Optional<Expression> getCorrelatedPredicates() {
+      if (correlatedPredicates.isEmpty()) {
+        return Optional.empty();
+      }
+      return Optional.of(and(correlatedPredicates));
+    }
+
+    public PlanNode getNode() {
+      return node;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index f039c09a701..ba9487c2ba9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@ -47,6 +47,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.ReplaceSymbolInExpression;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
@@ -81,6 +82,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.ImmutableMap.toImmutableMap;
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.ATTRIBUTE;
@@ -984,6 +986,19 @@ public class PushPredicateIntoTableScan implements 
PlanOptimizer {
       return output;
     }
 
+    @Override
+    public PlanNode visitAssignUniqueId(AssignUniqueId node, RewriteContext 
context) {
+      Expression inheritedPredicate =
+          context.inheritedPredicate != null ? context.inheritedPredicate : 
TRUE_LITERAL;
+      Set<Symbol> predicateSymbols = extractUnique(inheritedPredicate);
+      checkState(
+          !predicateSymbols.contains(node.getIdColumn()),
+          "UniqueId in predicate is not yet supported");
+      PlanNode rewrittenChild = node.getChild().accept(this, context);
+      node.replaceChildren(ImmutableList.of(rewrittenChild));
+      return node;
+    }
+
     @Override
     public PlanNode visitInsertTablet(InsertTabletNode node, RewriteContext 
context) {
       return node;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformQuantifiedComparisonApplyToCorrelatedJoin.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformQuantifiedComparisonApplyToCorrelatedJoin.java
index 00fecc1a69a..a867faaa6e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformQuantifiedComparisonApplyToCorrelatedJoin.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformQuantifiedComparisonApplyToCorrelatedJoin.java
@@ -21,12 +21,7 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
 
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature;
-import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionId;
-import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionKind;
-import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.FunctionNullability;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
-import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.ResolvedFunction;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.SimplePlanRewriter;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
@@ -54,7 +49,6 @@ import org.apache.tsfile.read.common.type.Type;
 
 import java.util.EnumSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Optional;
 import java.util.function.Function;
 
@@ -66,6 +60,7 @@ import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.globalAggregation;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleAggregation;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode.Quantifier.ALL;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Util.getResolvedBuiltInAggregateFunction;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.FALSE_LITERAL;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.EQUAL;
@@ -142,7 +137,7 @@ public class 
TransformQuantifiedComparisonApplyToCorrelatedJoin implements PlanO
                   minValue,
                       new AggregationNode.Aggregation(
                           getResolvedBuiltInAggregateFunction(
-                              "min", ImmutableList.of(outputColumnType)),
+                              metadata, "min", 
ImmutableList.of(outputColumnType)),
                           outputColumnReferences,
                           false,
                           Optional.empty(),
@@ -151,7 +146,7 @@ public class 
TransformQuantifiedComparisonApplyToCorrelatedJoin implements PlanO
                   maxValue,
                       new AggregationNode.Aggregation(
                           getResolvedBuiltInAggregateFunction(
-                              "max", ImmutableList.of(outputColumnType)),
+                              metadata, "max", 
ImmutableList.of(outputColumnType)),
                           outputColumnReferences,
                           false,
                           Optional.empty(),
@@ -160,7 +155,7 @@ public class 
TransformQuantifiedComparisonApplyToCorrelatedJoin implements PlanO
                   countAllValue,
                       new AggregationNode.Aggregation(
                           getResolvedBuiltInAggregateFunction(
-                              "count_all", ImmutableList.of(outputColumnType)),
+                              metadata, "count_all", 
ImmutableList.of(outputColumnType)),
                           outputColumnReferences,
                           false,
                           Optional.empty(),
@@ -169,7 +164,7 @@ public class 
TransformQuantifiedComparisonApplyToCorrelatedJoin implements PlanO
                   countNonNullValue,
                       new AggregationNode.Aggregation(
                           getResolvedBuiltInAggregateFunction(
-                              "count", ImmutableList.of(outputColumnType)),
+                              metadata, "count", 
ImmutableList.of(outputColumnType)),
                           outputColumnReferences,
                           false,
                           Optional.empty(),
@@ -197,18 +192,6 @@ public class 
TransformQuantifiedComparisonApplyToCorrelatedJoin implements PlanO
           join, Assignments.of(quantifiedComparisonSymbol, 
valueComparedToSubquery));
     }
 
-    private ResolvedFunction getResolvedBuiltInAggregateFunction(
-        String functionName, List<Type> argumentTypes) {
-      // The same as the code in ExpressionAnalyzer
-      Type type = metadata.getFunctionReturnType(functionName, argumentTypes);
-      return new ResolvedFunction(
-          new BoundSignature(functionName.toLowerCase(Locale.ENGLISH), type, 
argumentTypes),
-          new FunctionId("noop"),
-          FunctionKind.AGGREGATE,
-          true,
-          
FunctionNullability.getAggregationFunctionNullability(argumentTypes.size()));
-    }
-
     public Expression rewriteUsingBounds(
         ApplyNode.QuantifiedComparison quantifiedComparison,
         Symbol minValue,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
index 2b414438983..2e32cc13acd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.DeterminismEvaluator;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode;
@@ -760,6 +761,18 @@ public class UnaliasSymbolReferences implements 
PlanOptimizer {
               newSemiJoinOutput),
           outputMapping);
     }
+
+    @Override
+    public PlanAndMappings visitAssignUniqueId(AssignUniqueId node, 
UnaliasContext context) {
+      PlanAndMappings rewrittenSource = node.getChild().accept(this, context);
+      Map<Symbol, Symbol> mapping = new 
HashMap<>(rewrittenSource.getMappings());
+      SymbolMapper mapper = symbolMapper(mapping);
+
+      Symbol newUnique = mapper.map(node.getIdColumn());
+
+      return new PlanAndMappings(
+          new AssignUniqueId(node.getPlanNodeId(), rewrittenSource.getRoot(), 
newUnique), mapping);
+    }
   }
 
   private static class UnaliasContext {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java
index 83798c451bc..523dd09f9f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java
@@ -21,6 +21,11 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
 
 import 
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction;
 import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature;
+import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionId;
+import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionKind;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.FunctionNullability;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.ResolvedFunction;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
@@ -33,6 +38,8 @@ import org.apache.tsfile.read.common.type.Type;
 import org.apache.tsfile.utils.Pair;
 
 import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 
@@ -217,4 +224,16 @@ public class Util {
             node.getGroupIdSymbol()),
         rightResult);
   }
+
+  public static ResolvedFunction getResolvedBuiltInAggregateFunction(
+      Metadata metadata, String functionName, List<Type> argumentTypes) {
+    // The same as the code in ExpressionAnalyzer
+    Type type = metadata.getFunctionReturnType(functionName, argumentTypes);
+    return new ResolvedFunction(
+        new BoundSignature(functionName.toLowerCase(Locale.ENGLISH), type, 
argumentTypes),
+        new FunctionId("noop"),
+        FunctionKind.AGGREGATE,
+        true,
+        
FunctionNullability.getAggregationFunctionNullability(argumentTypes.size()));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/AssignUniqueIdMatcher.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/AssignUniqueIdMatcher.java
new file mode 100644
index 00000000000..6ff4a6b9e4c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/AssignUniqueIdMatcher.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.assertions;
+
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId;
+
+import java.util.Optional;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+public class AssignUniqueIdMatcher implements RvalueMatcher {
+  @Override
+  public Optional<Symbol> getAssignedSymbol(
+      PlanNode node, SessionInfo sessionInfo, Metadata metadata, SymbolAliases 
symbolAliases) {
+    if (!(node instanceof AssignUniqueId)) {
+      return Optional.empty();
+    }
+
+    AssignUniqueId assignUniqueIdNode = (AssignUniqueId) node;
+
+    return Optional.of(assignUniqueIdNode.getIdColumn());
+  }
+
+  @Override
+  public String toString() {
+    return toStringHelper(this).toString();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
index 5c2ce8e2f80..6d8b3e98628 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.GroupRe
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode;
@@ -472,6 +473,11 @@ public final class PlanMatchPattern {
         .with(new SemiJoinMatcher(sourceSymbolAlias, filteringSymbolAlias, 
outputAlias));
   }
 
+  public static PlanMatchPattern assignUniqueId(String uniqueSymbolAlias, 
PlanMatchPattern source) {
+    return node(AssignUniqueId.class, source)
+        .withAlias(uniqueSymbolAlias, new AssignUniqueIdMatcher());
+  }
+
   public static PlanMatchPattern streamSort(PlanMatchPattern source) {
     return node(StreamSortNode.class, source);
   }

Reply via email to