This is an automated email from the ASF dual-hosted git repository.

lancelly pushed a commit to branch support_uncorrelated_in_predicate
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 010aec8df6a60aec3ec37e6af40a77f766ec3149
Author: lancelly <[email protected]>
AuthorDate: Fri Dec 6 16:29:32 2024 +0800

    Add support for row type && Add SemiJoinNode && Add uncorrelated 
InPredicate related rules
---
 .../plan/planner/plan/node/PlanNodeType.java       |   4 +
 .../plan/planner/plan/node/PlanVisitor.java        |   5 +
 .../plan/relational/planner/IrTypeAnalyzer.java    |  11 ++
 .../plan/relational/planner/SubqueryPlanner.java   |   7 +-
 .../plan/relational/planner/TranslationMap.java    |   8 ++
 .../relational/planner/ir/ExpressionRewriter.java  |   6 +
 .../planner/ir/ExpressionTreeRewriter.java         |  44 ++++++
 .../planner/iterative/rule/PruneApplyColumns.java  | 138 +++++++++++++++++++
 .../iterative/rule/PruneApplyCorrelation.java      |  70 ++++++++++
 .../iterative/rule/PruneApplySourceColumns.java    |  95 +++++++++++++
 .../rule/RemoveUnreferencedScalarApplyNodes.java   |  42 ++++++
 .../rule/RemoveUnreferencedScalarSubqueries.java   |  70 ++++++++++
 ...mUncorrelatedInPredicateSubqueryToSemiJoin.java |  95 +++++++++++++
 .../plan/relational/planner/node/SemiJoinNode.java | 148 ++++++++++++++++++++
 .../optimizations/LogicalOptimizeFactory.java      |  13 +-
 .../optimizations/UnaliasSymbolReferences.java     |  29 ++++
 .../plan/relational/sql/ast/AstVisitor.java        |   8 ++
 .../plan/relational/sql/ast/RowDataType.java       | 149 +++++++++++++++++++++
 .../relational/sql/util/ExpressionFormatter.java   |  20 +++
 .../plan/relational/type/InternalTypeManager.java  |  44 +++++-
 .../plan/relational/type/NamedType.java            |  62 +++++++++
 .../plan/relational/type/ParametricType.java       |  30 +++++
 .../plan/relational/type/RowParametricType.java    |  59 ++++++++
 .../plan/relational/type/TypeParameter.java        | 128 ++++++++++++++++++
 .../relational/type/TypeSignatureTranslator.java   |  48 ++++++-
 25 files changed, 1322 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 3e53bc69690..62fa6362edc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -120,6 +120,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingl
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.ConstructTableDevicesBlackListNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.CreateOrUpdateTableDeviceNode;
@@ -280,6 +281,7 @@ public enum PlanNodeType {
   TABLE_EXCHANGE_NODE((short) 1018),
   TABLE_EXPLAIN_ANALYZE_NODE((short) 1019),
   TABLE_ENFORCE_SINGLE_ROW_NODE((short) 1020),
+  TABLE_SEMI_JOIN_NODE((short) 1021),
 
   RELATIONAL_INSERT_TABLET((short) 2000),
   RELATIONAL_INSERT_ROW((short) 2001),
@@ -637,6 +639,8 @@ public enum PlanNodeType {
         throw new UnsupportedOperationException("ExplainAnalyzeNode should not 
be deserialized");
       case 1020:
         return EnforceSingleRowNode.deserialize(buffer);
+      case 1021:
+        return SemiJoinNode.deserialize(buffer);
       case 2000:
         return RelationalInsertTabletNode.deserialize(buffer);
       case 2001:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 682f2a65202..0c09911b053 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -124,6 +124,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.GroupRe
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.ConstructTableDevicesBlackListNode;
@@ -740,6 +741,10 @@ public abstract class PlanVisitor<R, C> {
     return visitTwoChildProcess(node, context);
   }
 
+  public R visitSemiJoin(SemiJoinNode node, C context) {
+    return visitPlan(node, context);
+  }
+
   public R visitGroupReference(GroupReference node, C context) {
     return visitPlan(node, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/IrTypeAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/IrTypeAnalyzer.java
index cdf01015f40..1a19f40132f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/IrTypeAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/IrTypeAnalyzer.java
@@ -54,6 +54,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NotExpression;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullIfExpression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Row;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SearchedCaseExpression;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SimpleCaseExpression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.StringLiteral;
@@ -63,6 +64,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.tsfile.read.common.type.BlobType;
 import org.apache.tsfile.read.common.type.DateType;
+import org.apache.tsfile.read.common.type.RowType;
 import org.apache.tsfile.read.common.type.StringType;
 import org.apache.tsfile.read.common.type.TimestampType;
 import org.apache.tsfile.read.common.type.Type;
@@ -75,6 +77,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.ImmutableList.toImmutableList;
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureTranslator.toTypeSignature;
 import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN;
@@ -416,6 +419,14 @@ public class IrTypeAnalyzer {
       return setExpressionType(node, BOOLEAN);
     }
 
+    @Override
+    protected Type visitRow(Row node, Context context) {
+      List<Type> types =
+          node.getItems().stream().map(child -> process(child, 
context)).collect(toImmutableList());
+
+      return setExpressionType(node, RowType.anonymous(types));
+    }
+
     @Override
     protected Type visitLikePredicate(LikePredicate node, Context context) {
       process(node.getValue(), context);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java
index 82f90ac36da..59b60abba08 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java
@@ -571,12 +571,7 @@ class SubqueryPlanner {
             new ProjectNode(
                 idAllocator.genPlanNodeId(),
                 relationPlan.getRoot(),
-                Assignments.of(
-                    column,
-                    new Cast(
-                        new Row(fields.build()),
-                        new GenericDataType(
-                            new Identifier(type.toString()), 
ImmutableList.of())))));
+                Assignments.of(column, new Cast(new Row(fields.build()), 
toSqlType(type)))));
 
     return coerceIfNecessary(subqueryPlan, column, subquery, coercion);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TranslationMap.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TranslationMap.java
index 6a01f59e2b7..80300c47760 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TranslationMap.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TranslationMap.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LikePredicate;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Parameter;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RowDataType;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Trim;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.util.AstUtil;
@@ -357,6 +358,13 @@ public class TranslationMap {
             // do not rewrite identifiers within type parameters
             return node;
           }
+
+          @Override
+          public Expression rewriteRowDataType(
+              RowDataType node, Void context, ExpressionTreeRewriter<Void> 
treeRewriter) {
+            // do not rewrite identifiers in field names
+            return node;
+          }
         },
         expression);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/ExpressionRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/ExpressionRewriter.java
index d232101d603..735b7425041 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/ExpressionRewriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/ExpressionRewriter.java
@@ -47,6 +47,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullIfExpression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Parameter;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QuantifiedComparisonExpression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Row;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RowDataType;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SearchedCaseExpression;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SimpleCaseExpression;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SubqueryExpression;
@@ -231,4 +232,9 @@ public class ExpressionRewriter<C> {
       GenericDataType node, C context, ExpressionTreeRewriter<C> treeRewriter) 
{
     return rewriteExpression(node, context, treeRewriter);
   }
+
+  public Expression rewriteRowDataType(
+      RowDataType node, C context, ExpressionTreeRewriter<C> treeRewriter) {
+    return rewriteExpression(node, context, treeRewriter);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/ExpressionTreeRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/ExpressionTreeRewriter.java
index cab412becfc..1632fb51339 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/ExpressionTreeRewriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/ExpressionTreeRewriter.java
@@ -50,6 +50,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NumericParameter;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Parameter;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QuantifiedComparisonExpression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Row;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RowDataType;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SearchedCaseExpression;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SimpleCaseExpression;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SubqueryExpression;
@@ -642,6 +643,49 @@ public final class ExpressionTreeRewriter<C> {
       return node;
     }
 
+    @Override
+    protected Expression visitRowDataType(RowDataType node, Context<C> 
context) {
+      if (!context.isDefaultRewrite()) {
+        Expression result =
+            rewriter.rewriteRowDataType(node, context.get(), 
ExpressionTreeRewriter.this);
+        if (result != null) {
+          return result;
+        }
+      }
+
+      ImmutableList.Builder<RowDataType.Field> rewritten = 
ImmutableList.builder();
+      for (RowDataType.Field field : node.getFields()) {
+        DataType dataType = rewrite(field.getType(), context.get());
+
+        Optional<Identifier> name = field.getName();
+
+        if (field.getName().isPresent()) {
+          Identifier identifier = field.getName().get();
+          Identifier rewrittenIdentifier = rewrite(identifier, context.get());
+
+          if (identifier != rewrittenIdentifier) {
+            name = Optional.of(rewrittenIdentifier);
+          }
+        }
+
+        @SuppressWarnings("OptionalEquality")
+        boolean nameRewritten = name != field.getName();
+        if (dataType != field.getType() || nameRewritten) {
+          rewritten.add(new RowDataType.Field(field.getLocation(), name, 
dataType));
+        } else {
+          rewritten.add(field);
+        }
+      }
+
+      List<RowDataType.Field> fields = rewritten.build();
+
+      if (!sameElements(fields, node.getFields())) {
+        return new RowDataType(node.getLocation(), fields);
+      }
+
+      return node;
+    }
+
     @Override
     public Expression visitCurrentDatabase(final CurrentDatabase node, final 
Context<C> context) {
       if (!context.isDefaultRewrite()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneApplyColumns.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneApplyColumns.java
new file mode 100644
index 00000000000..11dcaefba17
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneApplyColumns.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.google.common.collect.Sets.intersection;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.restrictOutputs;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.applyNode;
+
+/**
+ * This rule restricts the outputs of ApplyNode's input and subquery based on 
which ApplyNode's
+ * output symbols are referenced.
+ *
+ * <p>A symbol from input source can be pruned, when - it is not a referenced 
output symbol - it is
+ * not a correlation symbol - it is not referenced in subqueryAssignments
+ *
+ * <p>A symbol from subquery source can be pruned, when it is not referenced 
in subqueryAssignments.
+ *
+ * <p>A subquery assignment can be removed, when its key is not a referenced 
output symbol.
+ *
+ * <p>Note: this rule does not remove any symbols from the correlation list. 
This is responsibility
+ * of PruneApplyCorrelation rule.
+ *
+ * <p>Transforms:
+ *
+ * <pre>
+ * - Project (i1, r1)
+ *      - Apply
+ *          correlation: [corr]
+ *          assignments:
+ *              r1 -> a in s1,
+ *              r2 -> b in s2,
+ *          - Input (a, b, corr)
+ *          - Subquery (s1, s2)
+ * </pre>
+ *
+ * Into:
+ *
+ * <pre>
+ * - Project (i1, r1)
+ *      - Apply
+ *          correlation: [corr]
+ *          assignments:
+ *              r1 -> a in s1,
+ *          - Project (a, corr)
+ *              - Input (a, b, corr)
+ *          - Project (s1)
+ *              - Subquery (s1, s2)
+ * </pre>
+ */
+public class PruneApplyColumns extends ProjectOffPushDownRule<ApplyNode> {
+  public PruneApplyColumns() {
+    super(applyNode());
+  }
+
+  @Override
+  protected Optional<PlanNode> pushDownProjectOff(
+      Context context, ApplyNode applyNode, Set<Symbol> referencedOutputs) {
+    // remove unused apply node
+    if (intersection(applyNode.getSubqueryAssignments().keySet(), 
referencedOutputs).isEmpty()) {
+      return Optional.of(applyNode.getInput());
+    }
+
+    // extract referenced assignments
+    ImmutableSet.Builder<Symbol> requiredAssignmentsSymbols = 
ImmutableSet.builder();
+    ImmutableMap.Builder<Symbol, ApplyNode.SetExpression> 
newSubqueryAssignments =
+        ImmutableMap.builder();
+    for (Map.Entry<Symbol, ApplyNode.SetExpression> entry :
+        applyNode.getSubqueryAssignments().entrySet()) {
+      if (referencedOutputs.contains(entry.getKey())) {
+        requiredAssignmentsSymbols.addAll(entry.getValue().inputs());
+        newSubqueryAssignments.put(entry);
+      }
+    }
+
+    // prune subquery symbols
+    Optional<PlanNode> newSubquery =
+        restrictOutputs(
+            context.getIdAllocator(), applyNode.getSubquery(), 
requiredAssignmentsSymbols.build());
+
+    // prune input symbols
+    Set<Symbol> requiredInputSymbols =
+        ImmutableSet.<Symbol>builder()
+            .addAll(referencedOutputs)
+            .addAll(applyNode.getCorrelation())
+            .addAll(requiredAssignmentsSymbols.build())
+            .build();
+
+    Optional<PlanNode> newInput =
+        restrictOutputs(context.getIdAllocator(), applyNode.getInput(), 
requiredInputSymbols);
+
+    boolean pruned =
+        newSubquery.isPresent()
+            || newInput.isPresent()
+            || newSubqueryAssignments.buildOrThrow().size()
+                < applyNode.getSubqueryAssignments().size();
+
+    if (pruned) {
+      return Optional.of(
+          new ApplyNode(
+              applyNode.getPlanNodeId(),
+              newInput.orElse(applyNode.getInput()),
+              newSubquery.orElse(applyNode.getSubquery()),
+              newSubqueryAssignments.buildOrThrow(),
+              applyNode.getCorrelation(),
+              applyNode.getOriginSubquery()));
+    }
+
+    return Optional.empty();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneApplyCorrelation.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneApplyCorrelation.java
new file mode 100644
index 00000000000..942c5ba6e63
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneApplyCorrelation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import java.util.List;
+import java.util.Set;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor.extractUnique;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.applyNode;
+
+/**
+ * This rule updates ApplyNode's correlation list. A symbol can be removed 
from the correlation list
+ * if it is not referenced by the subquery node. Note: This rule does not 
restrict ApplyNode's
+ * children outputs. It requires additional information about context (symbols 
required by the outer
+ * plan) and is done in PruneApplyColumns rule.
+ */
+public class PruneApplyCorrelation implements Rule<ApplyNode> {
+  private static final Pattern<ApplyNode> PATTERN = applyNode();
+
+  @Override
+  public Pattern<ApplyNode> getPattern() {
+    return PATTERN;
+  }
+
+  @Override
+  public Result apply(ApplyNode applyNode, Captures captures, Context context) 
{
+    Set<Symbol> subquerySymbols = extractUnique(applyNode.getSubquery(), 
context.getLookup());
+    List<Symbol> newCorrelation =
+        applyNode.getCorrelation().stream()
+            .filter(subquerySymbols::contains)
+            .collect(toImmutableList());
+
+    if (newCorrelation.size() < applyNode.getCorrelation().size()) {
+      return Result.ofPlanNode(
+          new ApplyNode(
+              applyNode.getPlanNodeId(),
+              applyNode.getInput(),
+              applyNode.getSubquery(),
+              applyNode.getSubqueryAssignments(),
+              newCorrelation,
+              applyNode.getOriginSubquery()));
+    }
+
+    return Result.empty();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneApplySourceColumns.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneApplySourceColumns.java
new file mode 100644
index 00000000000..746866102d2
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneApplySourceColumns.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.Optional;
+import java.util.Set;
+
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.restrictOutputs;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.applyNode;
+
+/**
+ * This rule restricts outputs of ApplyNode's subquery to include only the 
symbols needed for
+ * subqueryAssignments. Symbols from the subquery are not produced at 
ApplyNode's output. They are
+ * only used for the assignments. Transforms:
+ *
+ * <pre>
+ * - Apply
+ *      correlation: [corr_symbol]
+ *      assignments:
+ *          result_1 -> a in subquery_symbol_1,
+ *          result_2 -> b > ALL subquery_symbol_2
+ *    - Input (a, b, corr_symbol)
+ *    - Subquery (subquery_symbol_1, subquery_symbol_2, subquery_symbol_3)
+ * </pre>
+ *
+ * Into:
+ *
+ * <pre>
+ * - Apply
+ *      correlation: [corr_symbol]
+ *      assignments:
+ *          result_1 -> a in subquery_symbol_1,
+ *          result_2 -> b > ALL subquery_symbol_2
+ *    - Input (a, b, corr_symbol)
+ *    - Project
+ *          subquery_symbol_1 -> subquery_symbol_1
+ *          subquery_symbol_2 -> subquery_symbol_2
+ *        - Subquery (subquery_symbol_1, subquery_symbol_2, subquery_symbol_3)
+ * </pre>
+ *
+ * Note: ApplyNode's input symbols are produced on ApplyNode's output. They 
cannot be pruned without
+ * outer context.
+ */
+public class PruneApplySourceColumns implements Rule<ApplyNode> {
+  private static final Pattern<ApplyNode> PATTERN = applyNode();
+
+  @Override
+  public Pattern<ApplyNode> getPattern() {
+    return PATTERN;
+  }
+
+  @Override
+  public Result apply(ApplyNode applyNode, Captures captures, Context context) 
{
+    Set<Symbol> subqueryAssignmentsSymbols =
+        applyNode.getSubqueryAssignments().values().stream()
+            .flatMap(expression -> expression.inputs().stream())
+            .collect(toImmutableSet());
+
+    Optional<PlanNode> prunedSubquery =
+        restrictOutputs(
+            context.getIdAllocator(), applyNode.getSubquery(), 
subqueryAssignmentsSymbols);
+    return prunedSubquery
+        .map(
+            subquery -> 
applyNode.replaceChildren(ImmutableList.of(applyNode.getInput(), subquery)))
+        .map(Result::ofPlanNode)
+        .orElse(Result.empty());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/RemoveUnreferencedScalarApplyNodes.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/RemoveUnreferencedScalarApplyNodes.java
new file mode 100644
index 00000000000..fcecc2c3b82
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/RemoveUnreferencedScalarApplyNodes.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.applyNode;
+
+public class RemoveUnreferencedScalarApplyNodes implements Rule<ApplyNode> {
+  private static final Pattern<ApplyNode> PATTERN =
+      applyNode().matching(applyNode -> 
applyNode.getSubqueryAssignments().isEmpty());
+
+  @Override
+  public Pattern<ApplyNode> getPattern() {
+    return PATTERN;
+  }
+
+  @Override
+  public Result apply(ApplyNode applyNode, Captures captures, Context context) 
{
+    return Result.ofPlanNode(applyNode.getInput());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/RemoveUnreferencedScalarSubqueries.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/RemoveUnreferencedScalarSubqueries.java
new file mode 100644
index 00000000000..badc7dff3c4
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/RemoveUnreferencedScalarSubqueries.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Lookup;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode.JoinType.INNER;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode.JoinType.RIGHT;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.CorrelatedJoin.filter;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.correlatedJoin;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.QueryCardinalityUtil.isAtLeastScalar;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.QueryCardinalityUtil.isScalar;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL;
+
+public class RemoveUnreferencedScalarSubqueries implements 
Rule<CorrelatedJoinNode> {
+  private static final Pattern<CorrelatedJoinNode> PATTERN =
+      correlatedJoin().with(filter().equalTo(TRUE_LITERAL));
+
+  @Override
+  public Pattern<CorrelatedJoinNode> getPattern() {
+    return PATTERN;
+  }
+
+  @Override
+  public Result apply(CorrelatedJoinNode correlatedJoinNode, Captures 
captures, Context context) {
+    PlanNode input = correlatedJoinNode.getInput();
+    PlanNode subquery = correlatedJoinNode.getSubquery();
+
+    if (isUnreferencedScalar(input, context.getLookup())
+        && correlatedJoinNode.getCorrelation().isEmpty()) {
+      if (correlatedJoinNode.getJoinType() == INNER
+          || correlatedJoinNode.getJoinType() == RIGHT
+          || isAtLeastScalar(subquery, context.getLookup())) {
+        return Result.ofPlanNode(subquery);
+      }
+    }
+
+    if (isUnreferencedScalar(subquery, context.getLookup())) {
+      return Result.ofPlanNode(input);
+    }
+
+    return Result.empty();
+  }
+
+  private boolean isUnreferencedScalar(PlanNode planNode, Lookup lookup) {
+    return planNode.getOutputSymbols().isEmpty() && isScalar(planNode, lookup);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/TransformUncorrelatedInPredicateSubqueryToSemiJoin.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/TransformUncorrelatedInPredicateSubqueryToSemiJoin.java
new file mode 100644
index 00000000000..80fe79c7e5f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/TransformUncorrelatedInPredicateSubqueryToSemiJoin.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.Apply.correlation;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.applyNode;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern.empty;
+
+/**
+ * This optimizers looks for InPredicate expressions in ApplyNodes and 
replaces the nodes with
+ * SemiJoin nodes.
+ *
+ * <p>Plan before optimizer:
+ *
+ * <pre>
+ * Filter(a IN b):
+ *   Apply
+ *     - correlation: []  // empty
+ *     - input: some plan A producing symbol a
+ *     - subquery: some plan B producing symbol b
+ * </pre>
+ *
+ * <p>Plan after optimizer:
+ *
+ * <pre>
+ * Filter(semijoinresult):
+ *   SemiJoin
+ *     - source: plan A
+ *     - filteringSource: B
+ *     - sourceJoinSymbol: symbol a
+ *     - filteringSourceJoinSymbol: symbol b
+ *     - semiJoinOutput: semijoinresult
+ * </pre>
+ */
+public class TransformUncorrelatedInPredicateSubqueryToSemiJoin implements 
Rule<ApplyNode> {
+  private static final Pattern<ApplyNode> PATTERN = 
applyNode().with(empty(correlation()));
+
+  @Override
+  public Pattern<ApplyNode> getPattern() {
+    return PATTERN;
+  }
+
+  @Override
+  public Result apply(ApplyNode applyNode, Captures captures, Context context) 
{
+    if (applyNode.getSubqueryAssignments().size() != 1) {
+      return Result.empty();
+    }
+
+    ApplyNode.SetExpression expression =
+        getOnlyElement(applyNode.getSubqueryAssignments().values());
+    if (!(expression instanceof ApplyNode.In)) {
+      return Result.empty();
+    }
+
+    ApplyNode.In inPredicate = (ApplyNode.In) expression;
+
+    Symbol semiJoinSymbol = 
getOnlyElement(applyNode.getSubqueryAssignments().keySet());
+
+    SemiJoinNode replacement =
+        new SemiJoinNode(
+            context.getIdAllocator().genPlanNodeId(),
+            applyNode.getInput(),
+            applyNode.getSubquery(),
+            inPredicate.getValue(),
+            inPredicate.getReference(),
+            semiJoinSymbol);
+
+    return Result.ofPlanNode(replacement);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SemiJoinNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SemiJoinNode.java
new file mode 100644
index 00000000000..94cdb47cddc
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SemiJoinNode.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TwoChildProcessNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+
+import com.google.common.collect.ImmutableList;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class SemiJoinNode extends TwoChildProcessNode {
+  private final Symbol sourceJoinSymbol;
+  private final Symbol filteringSourceJoinSymbol;
+  private final Symbol semiJoinOutput;
+
+  public SemiJoinNode(
+      PlanNodeId id,
+      PlanNode source,
+      PlanNode filteringSource,
+      Symbol sourceJoinSymbol,
+      Symbol filteringSourceJoinSymbol,
+      Symbol semiJoinOutput) {
+    super(id, source, filteringSource);
+    this.sourceJoinSymbol = requireNonNull(sourceJoinSymbol, "sourceJoinSymbol 
is null");
+    this.filteringSourceJoinSymbol =
+        requireNonNull(filteringSourceJoinSymbol, "filteringSourceJoinSymbol 
is null");
+    this.semiJoinOutput = requireNonNull(semiJoinOutput, "semiJoinOutput is 
null");
+
+    checkArgument(
+        source.getOutputSymbols().contains(sourceJoinSymbol),
+        "Source does not contain join symbol");
+    checkArgument(
+        filteringSource.getOutputSymbols().contains(filteringSourceJoinSymbol),
+        "Filtering source does not contain filtering join symbol");
+  }
+
+  public PlanNode getSource() {
+    return leftChild;
+  }
+
+  public PlanNode getFilteringSource() {
+    return rightChild;
+  }
+
+  public Symbol getSourceJoinSymbol() {
+    return sourceJoinSymbol;
+  }
+
+  public Symbol getFilteringSourceJoinSymbol() {
+    return filteringSourceJoinSymbol;
+  }
+
+  public Symbol getSemiJoinOutput() {
+    return semiJoinOutput;
+  }
+
+  @Override
+  public List<Symbol> getOutputSymbols() {
+    return ImmutableList.<Symbol>builder()
+        .addAll(leftChild.getOutputSymbols())
+        .add(semiJoinOutput)
+        .build();
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitSemiJoin(this, context);
+  }
+
+  @Override
+  public PlanNode replaceChildren(List<PlanNode> newChildren) {
+    checkArgument(newChildren.size() == 2, "expected newChildren to contain 2 
nodes");
+    return new SemiJoinNode(
+        getPlanNodeId(),
+        newChildren.get(0),
+        newChildren.get(1),
+        sourceJoinSymbol,
+        filteringSourceJoinSymbol,
+        semiJoinOutput);
+  }
+
+  @Override
+  public PlanNode clone() {
+    // clone without children
+    return new SemiJoinNode(
+        getPlanNodeId(), null, null, sourceJoinSymbol, 
filteringSourceJoinSymbol, semiJoinOutput);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.TABLE_SEMI_JOIN_NODE.serialize(byteBuffer);
+
+    Symbol.serialize(sourceJoinSymbol, byteBuffer);
+    Symbol.serialize(filteringSourceJoinSymbol, byteBuffer);
+    Symbol.serialize(semiJoinOutput, byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.TABLE_SEMI_JOIN_NODE.serialize(stream);
+
+    Symbol.serialize(sourceJoinSymbol, stream);
+    Symbol.serialize(filteringSourceJoinSymbol, stream);
+    Symbol.serialize(semiJoinOutput, stream);
+  }
+
+  public static SemiJoinNode deserialize(ByteBuffer byteBuffer) {
+    Symbol sourceJoinSymbol = Symbol.deserialize(byteBuffer);
+    Symbol filteringSourceJoinSymbol = Symbol.deserialize(byteBuffer);
+    Symbol semiJoinOutput = Symbol.deserialize(byteBuffer);
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new SemiJoinNode(
+        planNodeId, null, null, sourceJoinSymbol, filteringSourceJoinSymbol, 
semiJoinOutput);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
index 30491784287..42535ef68ef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
@@ -32,6 +32,9 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Me
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimits;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneAggregationColumns;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneAggregationSourceColumns;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneApplyColumns;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneApplyCorrelation;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneApplySourceColumns;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneCorrelatedJoinColumns;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneCorrelatedJoinCorrelation;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneEnforceSingleRowColumns;
@@ -53,7 +56,9 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Re
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveRedundantEnforceSingleRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveRedundantIdentityProjections;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveTrivialFilters;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveUnreferencedScalarSubqueries;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.SimplifyExpressions;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.TransformUncorrelatedInPredicateSubqueryToSemiJoin;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.TransformUncorrelatedSubqueryToJoin;
 
 import com.google.common.collect.ImmutableList;
@@ -77,6 +82,9 @@ public class LogicalOptimizeFactory {
             // TODO After ValuesNode introduced
             // new RemoveEmptyGlobalAggregation(),
             new PruneAggregationSourceColumns(),
+            new PruneApplyColumns(),
+            new PruneApplyCorrelation(),
+            new PruneApplySourceColumns(),
             new PruneCorrelatedJoinColumns(),
             new PruneCorrelatedJoinCorrelation(),
             new PruneEnforceSingleRowColumns(),
@@ -203,8 +211,9 @@ public class LogicalOptimizeFactory {
             plannerContext,
             ruleStats,
             ImmutableSet.of(
-                new RemoveRedundantEnforceSingleRowNode(),
-                new TransformUncorrelatedSubqueryToJoin())),
+                new RemoveRedundantEnforceSingleRowNode(), new 
RemoveUnreferencedScalarSubqueries(),
+                new TransformUncorrelatedSubqueryToJoin(),
+                    new TransformUncorrelatedInPredicateSubqueryToSemiJoin())),
         new CheckSubqueryNodesAreRewritten(),
         simplifyOptimizer,
         new PushPredicateIntoTableScan(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
index aedd97bff6d..736272117a1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
@@ -658,6 +659,34 @@ public class UnaliasSymbolReferences implements 
PlanOptimizer {
               node.isSpillable()),
           outputMapping);
     }
+
+    @Override
+    public PlanAndMappings visitSemiJoin(SemiJoinNode node, UnaliasContext 
context) {
+      // it is assumed that symbols are distinct between SemiJoin source and 
filtering source. Only
+      // symbols from outer correlation might be the exception
+      PlanAndMappings rewrittenSource = node.getSource().accept(this, context);
+      PlanAndMappings rewrittenFilteringSource = 
node.getFilteringSource().accept(this, context);
+
+      Map<Symbol, Symbol> outputMapping = new HashMap<>();
+      outputMapping.putAll(rewrittenSource.getMappings());
+      outputMapping.putAll(rewrittenFilteringSource.getMappings());
+
+      SymbolMapper mapper = symbolMapper(outputMapping);
+
+      Symbol newSourceJoinSymbol = mapper.map(node.getSourceJoinSymbol());
+      Symbol newFilteringSourceJoinSymbol = 
mapper.map(node.getFilteringSourceJoinSymbol());
+      Symbol newSemiJoinOutput = mapper.map(node.getSemiJoinOutput());
+
+      return new PlanAndMappings(
+          new SemiJoinNode(
+              node.getPlanNodeId(),
+              rewrittenSource.getRoot(),
+              rewrittenFilteringSource.getRoot(),
+              newSourceJoinSymbol,
+              newFilteringSourceJoinSymbol,
+              newSemiJoinOutput),
+          outputMapping);
+    }
   }
 
   private static class UnaliasContext {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
index 1a2e3b9f3a2..eb3b6b98fd5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
@@ -477,6 +477,14 @@ public abstract class AstVisitor<R, C> {
     return visitDataType(node, context);
   }
 
+  protected R visitRowDataType(RowDataType node, C context) {
+    return visitDataType(node, context);
+  }
+
+  protected R visitRowField(RowDataType.Field node, C context) {
+    return visitNode(node, context);
+  }
+
   protected R visitDataTypeParameter(DataTypeParameter node, C context) {
     return visitNode(node, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RowDataType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RowDataType.java
new file mode 100644
index 00000000000..77452dab7f9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RowDataType.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+public class RowDataType extends DataType {
+  private final List<Field> fields;
+
+  public RowDataType(Optional<NodeLocation> location, List<Field> fields) {
+    this(location.orElse(null), fields);
+  }
+
+  public RowDataType(NodeLocation location, List<Field> fields) {
+    super(location);
+    this.fields = ImmutableList.copyOf(fields);
+  }
+
+  public List<Field> getFields() {
+    return fields;
+  }
+
+  @Override
+  public List<? extends Node> getChildren() {
+    return fields;
+  }
+
+  @Override
+  public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+    return visitor.visitRowDataType(this, context);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    RowDataType that = (RowDataType) o;
+    return fields.equals(that.fields);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(fields);
+  }
+
+  public static class Field extends Node {
+    private final Optional<Identifier> name;
+    private final DataType type;
+
+    public Field(Optional<NodeLocation> location, Optional<Identifier> name, 
DataType type) {
+      this(location.orElse(null), name, type);
+    }
+
+    public Field(NodeLocation location, Optional<Identifier> name, DataType 
type) {
+      super(location);
+
+      this.name = requireNonNull(name, "name is null");
+      this.type = requireNonNull(type, "type is null");
+    }
+
+    public Optional<Identifier> getName() {
+      return name;
+    }
+
+    public DataType getType() {
+      return type;
+    }
+
+    @Override
+    public List<? extends Node> getChildren() {
+      ImmutableList.Builder<Node> children = ImmutableList.builder();
+      name.ifPresent(children::add);
+      children.add(type);
+
+      return children.build();
+    }
+
+    @Override
+    protected <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+      return visitor.visitRowField(this, context);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      if (name.isPresent()) {
+        builder.append(name.get());
+        builder.append(" ");
+      }
+      builder.append(type);
+
+      return builder.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      Field field = (Field) o;
+      return name.equals(field.name) && type.equals(field.type);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(name, type);
+    }
+
+    @Override
+    public boolean shallowEquals(Node other) {
+      return sameClass(this, other);
+    }
+  }
+
+  @Override
+  public boolean shallowEquals(Node other) {
+    return sameClass(this, other);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/ExpressionFormatter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/ExpressionFormatter.java
index e36a272b363..c584947c967 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/ExpressionFormatter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/ExpressionFormatter.java
@@ -64,6 +64,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Parameter;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QuantifiedComparisonExpression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Row;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RowDataType;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SearchedCaseExpression;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SimpleCaseExpression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SimpleGroupBy;
@@ -532,6 +533,25 @@ public final class ExpressionFormatter {
       return result.toString();
     }
 
+    @Override
+    protected String visitRowDataType(RowDataType node, Void context) {
+      return node.getFields().stream().map(this::process).collect(joining(", 
", "ROW(", ")"));
+    }
+
+    @Override
+    protected String visitRowField(RowDataType.Field node, Void context) {
+      StringBuilder result = new StringBuilder();
+
+      if (node.getName().isPresent()) {
+        result.append(process(node.getName().get(), context));
+        result.append(" ");
+      }
+
+      result.append(process(node.getType(), context));
+
+      return result.toString();
+    }
+
     @Override
     protected String visitTypeParameter(TypeParameter node, Void context) {
       return process(node.getValue(), context);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/InternalTypeManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/InternalTypeManager.java
index 6f2aaef76ab..b3b7fb77154 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/InternalTypeManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/InternalTypeManager.java
@@ -23,10 +23,14 @@ import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.type.Type;
 import org.apache.tsfile.read.common.type.TypeEnum;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Locale;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.type.RowParametricType.ROW;
 import static org.apache.tsfile.read.common.type.BinaryType.TEXT;
 import static org.apache.tsfile.read.common.type.BlobType.BLOB;
 import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN;
@@ -43,6 +47,8 @@ public class InternalTypeManager implements TypeManager {
 
   private final ConcurrentMap<TypeSignature, Type> types = new 
ConcurrentHashMap<>();
 
+  private final ConcurrentMap<String, ParametricType> parametricTypes = new 
ConcurrentHashMap<>();
+
   public InternalTypeManager() {
     types.put(new 
TypeSignature(TypeEnum.DOUBLE.name().toLowerCase(Locale.ENGLISH)), DOUBLE);
     types.put(new 
TypeSignature(TypeEnum.FLOAT.name().toLowerCase(Locale.ENGLISH)), FLOAT);
@@ -55,17 +61,53 @@ public class InternalTypeManager implements TypeManager {
     types.put(new 
TypeSignature(TypeEnum.DATE.name().toLowerCase(Locale.ENGLISH)), DATE);
     types.put(new 
TypeSignature(TypeEnum.TIMESTAMP.name().toLowerCase(Locale.ENGLISH)), 
TIMESTAMP);
     types.put(new 
TypeSignature(TypeEnum.UNKNOWN.name().toLowerCase(Locale.ENGLISH)), UNKNOWN);
+
+    addParametricType(ROW);
   }
 
   @Override
   public Type getType(TypeSignature signature) throws TypeNotFoundException {
     Type type = types.get(signature);
     if (type == null) {
-      throw new TypeNotFoundException(signature);
+      return instantiateParametricType(signature);
     }
     return type;
   }
 
+  private void addParametricType(ParametricType parametricType) {
+    String name = parametricType.getName().toLowerCase(Locale.ENGLISH);
+    if ("ROW".equals(name)) {
+      name = "row";
+    }
+    checkArgument(
+        !parametricTypes.containsKey(name), "Parametric type already 
registered: %s", name);
+    parametricTypes.putIfAbsent(name, parametricType);
+  }
+
+  private Type instantiateParametricType(TypeSignature signature) {
+    List<TypeParameter> parameters = new ArrayList<>();
+
+    for (TypeSignatureParameter parameter : signature.getParameters()) {
+      TypeParameter typeParameter = TypeParameter.of(parameter, this);
+      parameters.add(typeParameter);
+    }
+
+    ParametricType parametricType =
+        parametricTypes.get(signature.getBase().toLowerCase(Locale.ENGLISH));
+    if (parametricType == null) {
+      throw new TypeNotFoundException(signature);
+    }
+
+    Type instantiatedType;
+    try {
+      instantiatedType = parametricType.createType(this, parameters);
+    } catch (IllegalArgumentException e) {
+      throw new TypeNotFoundException(signature, e);
+    }
+
+    return instantiatedType;
+  }
+
   @Override
   public Type fromSqlType(String type) {
     throw new UnsupportedOperationException();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/NamedType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/NamedType.java
new file mode 100644
index 00000000000..93e65cfab7b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/NamedType.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.type;
+
+import org.apache.tsfile.read.common.type.Type;
+
+import java.util.Objects;
+import java.util.Optional;
+
+public class NamedType {
+  private final Optional<RowFieldName> name;
+  private final Type type;
+
+  public NamedType(Optional<RowFieldName> name, Type type) {
+    this.name = name;
+    this.type = type;
+  }
+
+  public Optional<RowFieldName> getName() {
+    return name;
+  }
+
+  public Type getType() {
+    return type;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    NamedType other = (NamedType) o;
+
+    return Objects.equals(this.name, other.name) && Objects.equals(this.type, 
other.type);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(name, type);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/ParametricType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/ParametricType.java
new file mode 100644
index 00000000000..07c5d8a2899
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/ParametricType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.type;
+
+import org.apache.tsfile.read.common.type.Type;
+
+import java.util.List;
+
+public interface ParametricType {
+  String getName();
+
+  Type createType(TypeManager typeManager, List<TypeParameter> parameters);
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/RowParametricType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/RowParametricType.java
new file mode 100644
index 00000000000..005f6b2c419
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/RowParametricType.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.type;
+
+import org.apache.tsfile.read.common.type.RowType;
+import org.apache.tsfile.read.common.type.Type;
+
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.stream.Collectors.toList;
+
+public final class RowParametricType implements ParametricType {
+  public static final RowParametricType ROW = new RowParametricType();
+
+  private RowParametricType() {}
+
+  @Override
+  public String getName() {
+    return StandardTypes.ROW;
+  }
+
+  @Override
+  public Type createType(TypeManager typeManager, List<TypeParameter> 
parameters) {
+    checkArgument(!parameters.isEmpty(), "Row type must have at least one 
parameter");
+    checkArgument(
+        parameters.stream().allMatch(parameter -> parameter.getKind() == 
ParameterKind.NAMED_TYPE),
+        "Expected only named types as a parameters, got %s",
+        parameters);
+
+    List<RowType.Field> fields =
+        parameters.stream()
+            .map(TypeParameter::getNamedType)
+            .map(
+                parameter ->
+                    new RowType.Field(
+                        parameter.getName().map(RowFieldName::getName), 
parameter.getType()))
+            .collect(toList());
+
+    return RowType.createWithTypeSignature(fields);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/TypeParameter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/TypeParameter.java
new file mode 100644
index 00000000000..9c4aa133353
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/TypeParameter.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.type;
+
+import org.apache.tsfile.read.common.type.Type;
+
+import java.util.Objects;
+
+import static java.lang.String.format;
+
+public class TypeParameter {
+  private final ParameterKind kind;
+  private final Object value;
+
+  private TypeParameter(ParameterKind kind, Object value) {
+    this.kind = kind;
+    this.value = value;
+  }
+
+  public static TypeParameter of(Type type) {
+    return new TypeParameter(ParameterKind.TYPE, type);
+  }
+
+  public static TypeParameter of(long longLiteral) {
+    return new TypeParameter(ParameterKind.LONG, longLiteral);
+  }
+
+  public static TypeParameter of(NamedType namedType) {
+    return new TypeParameter(ParameterKind.NAMED_TYPE, namedType);
+  }
+
+  public static TypeParameter of(String variable) {
+    return new TypeParameter(ParameterKind.VARIABLE, variable);
+  }
+
+  public static TypeParameter of(TypeSignatureParameter parameter, TypeManager 
typeManager) {
+    switch (parameter.getKind()) {
+      case TYPE:
+        {
+          Type type = typeManager.getType(parameter.getTypeSignature());
+          return of(type);
+        }
+      case LONG:
+        return of(parameter.getLongLiteral());
+      case NAMED_TYPE:
+        {
+          Type type = 
typeManager.getType(parameter.getNamedTypeSignature().getTypeSignature());
+          return of(new 
NamedType(parameter.getNamedTypeSignature().getFieldName(), type));
+        }
+      case VARIABLE:
+        return of(parameter.getVariable());
+    }
+    throw new UnsupportedOperationException(format("Unsupported parameter 
[%s]", parameter));
+  }
+
+  public ParameterKind getKind() {
+    return kind;
+  }
+
+  public <A> A getValue(ParameterKind expectedParameterKind, Class<A> target) {
+    if (kind != expectedParameterKind) {
+      throw new AssertionError(
+          format("ParameterKind is [%s] but expected [%s]", kind, 
expectedParameterKind));
+    }
+    return target.cast(value);
+  }
+
+  public boolean isLongLiteral() {
+    return kind == ParameterKind.LONG;
+  }
+
+  public Type getType() {
+    return getValue(ParameterKind.TYPE, Type.class);
+  }
+
+  public Long getLongLiteral() {
+    return getValue(ParameterKind.LONG, Long.class);
+  }
+
+  public NamedType getNamedType() {
+    return getValue(ParameterKind.NAMED_TYPE, NamedType.class);
+  }
+
+  public String getVariable() {
+    return getValue(ParameterKind.VARIABLE, String.class);
+  }
+
+  @Override
+  public String toString() {
+    return value.toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    TypeParameter other = (TypeParameter) o;
+
+    return this.kind == other.kind && Objects.equals(this.value, other.value);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(kind, value);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/TypeSignatureTranslator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/TypeSignatureTranslator.java
index 9390cd4826a..383d81a0e66 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/TypeSignatureTranslator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/TypeSignatureTranslator.java
@@ -25,16 +25,23 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DataTypeParameter
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GenericDataType;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NumericParameter;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RowDataType;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TypeParameter;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.read.common.type.RowType;
 import org.apache.tsfile.read.common.type.Type;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Locale;
+import java.util.Optional;
 import java.util.Set;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.type.StandardTypes.ROW;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureParameter.namedTypeParameter;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureParameter.numericParameter;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureParameter.typeParameter;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureParameter.typeVariable;
@@ -44,8 +51,25 @@ public class TypeSignatureTranslator {
   private TypeSignatureTranslator() {}
 
   public static DataType toSqlType(Type type) {
-    return new GenericDataType(
-        new Identifier(type.getTypeEnum().name(), false), 
Collections.emptyList());
+    if (type instanceof RowType) {
+      RowType rowType = (RowType) type;
+      return new RowDataType(
+          Optional.empty(),
+          rowType.getFields().stream()
+              .map(
+                  field ->
+                      new RowDataType.Field(
+                          Optional.empty(),
+                          field
+                              .getName()
+                              .map(RowFieldName::new)
+                              .map(fieldName -> new 
Identifier(fieldName.getName(), false)),
+                          toSqlType(field.getType())))
+              .collect(toImmutableList()));
+    } else {
+      return new GenericDataType(
+          new Identifier(type.getTypeEnum().name(), false), 
Collections.emptyList());
+    }
   }
 
   public static TypeSignature toTypeSignature(DataType type) {
@@ -56,10 +80,30 @@ public class TypeSignatureTranslator {
     if (type instanceof GenericDataType) {
       return toTypeSignature((GenericDataType) type, typeVariables);
     }
+    if (type instanceof RowDataType) {
+      return toTypeSignature((RowDataType) type, typeVariables);
+    }
 
     throw new UnsupportedOperationException("Unsupported DataType: " + 
type.getClass().getName());
   }
 
+  private static TypeSignature toTypeSignature(RowDataType type, Set<String> 
typeVariables) {
+    List<TypeSignatureParameter> parameters =
+        type.getFields().stream()
+            .map(
+                field ->
+                    namedTypeParameter(
+                        new NamedTypeSignature(
+                            field
+                                .getName()
+                                .map(TypeSignatureTranslator::canonicalize)
+                                .map(RowFieldName::new),
+                            toTypeSignature(field.getType(), typeVariables))))
+            .collect(toImmutableList());
+
+    return new TypeSignature(ROW, parameters);
+  }
+
   private static TypeSignature toTypeSignature(GenericDataType type, 
Set<String> typeVariables) {
     ImmutableList.Builder<TypeSignatureParameter> parameters = 
ImmutableList.builder();
 

Reply via email to