924060929 commented on code in PR #11454:
URL: https://github.com/apache/doris/pull/11454#discussion_r960323068


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeSubquery.java:
##########
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.analysis;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.Exists;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.InSubquery;
+import org.apache.doris.nereids.trees.expressions.ScalarSubquery;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SubqueryExpr;
+import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
+import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * AnalyzeSubquery. translate from subquery to correlatedJoin.
+ * In two steps
+ * The first step is to replace the predicate corresponding to the filter 
where the subquery is located.
+ * The second step converts the subquery into an apply node.
+ */
+public class AnalyzeSubquery implements AnalysisRuleFactory {
+    @Override
+    public List<Rule> buildRules() {
+        return ImmutableList.of(
+                RuleType.ANALYZE_FILTER_SUBQUERY.build(
+                        logicalFilter().thenApply(ctx -> {
+                            LogicalFilter filter = ctx.root;
+                            List<SubqueryExpr> subqueryExprs = 
filter.getPredicates()
+                                    .collect(SubqueryExpr.class::isInstance);
+                            if (subqueryExprs.isEmpty()) {
+                                return filter;
+                            }
+
+                            Optional<Expression> newPredicates =
+                                    
recursiveReplacePredicate(filter.getPredicates());
+                            if (newPredicates.isPresent()) {
+                                return new LogicalFilter<>(
+                                        newPredicates.get(),
+                                        analyzedSubquery(subqueryExprs,
+                                                (LogicalPlan) filter.child(), 
ctx.cascadesContext));
+                            }
+                            return analyzedSubquery(subqueryExprs,
+                                    (LogicalPlan) filter.child(), 
ctx.cascadesContext);
+                        })
+                )
+        );
+    }
+
+    /**
+     * Convert expressions with subqueries in filter.
+     * before:
+     *      1.filter(t1.a = scalarSubquery(output b));
+     *      2.filter(inSubquery);   inSubquery = (t1.a in select ***);
+     *      3.filter(exists);   exists = (select ***);
+     *
+     * after:
+     *      1.filter(t1.a = b);
+     *      2.filter(True);
+     *      3.filter(True);

Review Comment:
   Looks like we should have rule to eliminate LogicalFilter if LogicalFilter 
is BooleanLiteral.TRUE



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/ExpressionTranslator.java:
##########
@@ -276,4 +283,30 @@ public Expr visitTimestampArithmetic(TimestampArithmetic 
arithmetic, PlanTransla
                     arithmetic.getDataType().toCatalogDataType());
         }
     }
+
+    public static org.apache.doris.analysis.AssertNumRowsElement 
translateAssert(
+            AssertNumRowsElement assertNumRowsElement) {
+        return new 
org.apache.doris.analysis.AssertNumRowsElement(assertNumRowsElement.getDesiredNumOfRows(),
+                assertNumRowsElement.getSubqueryString(), 
translateAsserTion(assertNumRowsElement.getAssertion()));
+    }
+
+    private static org.apache.doris.analysis.AssertNumRowsElement.Assertion 
translateAsserTion(
+            AssertNumRowsElement.Assertion assertion) {
+        switch (assertion) {
+            case EQ:
+                return Assertion.EQ;
+            case NE:
+                return Assertion.NE;
+            case LT:
+                return Assertion.LT;
+            case LE:
+                return Assertion.LE;
+            case GT:
+                return Assertion.GT;
+            case GE:
+                return Assertion.GE;
+            default:
+                return null;

Review Comment:
   return null is safe? should we throw a unsupported type exception here?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/Scope.java:
##########
@@ -18,28 +18,59 @@
 package org.apache.doris.nereids.rules.analysis;
 
 import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SubqueryExpr;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableList.Builder;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
  * The slot range required for expression analyze.
+ *
+ * slots: The symbols used at this level are stored in slots.
+ * outerScope: The scope information corresponding to the parent is stored in 
outerScope.
+ * ownerSubquery: The subquery corresponding to ownerSubquery.
+ * subqueryToOuterCorrelatedSlots: The slots correlated in the subquery,
+ *                                 only the slots that cannot be resolved at 
this level.
+ *
+ * eg:
+ * t1(k1, v1) / t2(k2, v2)
+ * select * from t1 where t1.k1 = (select sum(k2) from t2 where t1.v1 = t2.v2);
+ *
+ * When analyzing subquery:
+ *
+ * slots: k2, v2;
+ * outerScope:
+ *      slots: k1, v1;
+ *      outerScope: Optional.empty();
+ *      ownerSubquery: Optionsl.empty();
+ *      subqueryToOuterCorrelatedSlots: empty();
+ * ownerSubquery: subquery((select sum(k2) from t2 where t1.v1 = t2.v2));
+ * subqueryToOuterCorrelatedSlots: (subquery, v1);
  */
 public class Scope {
     private final Optional<Scope> outerScope;
     private final List<Slot> slots;
 
-    public Scope(Optional<Scope> outerScope, List<Slot> slots) {
+    private final Optional<SubqueryExpr> ownerSubquery;
+    private Map<SubqueryExpr, List<Slot>> subqueryToOuterCorrelatedSlots;

Review Comment:
   if a scope belong to a subquery, you don't need use a map to find the 
subquery's correlated slots?
   It should be List?
   ```java
   List<Slot> correlatedSlots;
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSlotReference.java:
##########
@@ -183,18 +191,29 @@ public Expression visitUnboundAlias(UnboundAlias 
unboundAlias, Void context) {
 
         @Override
         public Slot visitUnboundSlot(UnboundSlot unboundSlot, Void context) {
-            Optional<List<Slot>> boundedOpt = getScope()
-                    .toScopeLink() // Scope Link from inner scope to outer 
scope
-                    .stream()
-                    .map(scope -> bindSlot(unboundSlot, scope.getSlots()))
-                    .filter(slots -> !slots.isEmpty())
-                    .findFirst();
+            Optional<List<Slot>> boundedOpt = 
Optional.of(bindSlot(unboundSlot, getScope().getSlots()));
+            boolean foundInThisScope = !boundedOpt.get().isEmpty();
+            // Currently only looking for symbols on the previous level.
+            if (!foundInThisScope && getScope().getOuterScope().isPresent()) {
+                boundedOpt = Optional.of(bindSlot(unboundSlot,
+                        getScope()
+                        .getOuterScope()
+                        .get()
+                        .getSlots()));
+            }
             if (!boundedOpt.isPresent()) {
                 throw new AnalysisException("Cannot resolve " + 
unboundSlot.toString());
             }
             List<Slot> bounded = boundedOpt.get();
             switch (bounded.size()) {
                 case 1:
+                    if (!foundInThisScope) {
+                        SubqueryExpr subquery = 
getScope().getOuterScope().get().getSubquery().get();
+                        List<Slot> slots = 
getScope().getOuterScope().get().getCorrelatedSlots(subquery) == null

Review Comment:
   ```suggestion
                           List<Slot> correlatedSlots = 
getScope().getOuterScope().get().getCorrelatedSlots(subquery) == null
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ApplyPullFilterOnAgg.java:
##########
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite.logical;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Merge the correlated predicate and agg in the filter under apply.
+ * And keep the unCorrelated predicate under agg.
+ *
+ * Use the correlated column as the group by column of agg,
+ * the output column is the correlated column and the input column.
+ *
+ * before:
+ *              apply
+ *          /              \
+ * Input(output:b)    agg(output:fn,c; group by:null)
+ *                              |
+ *              Filter(correlated predicate(Input.e = this.f)/Unapply 
predicate)
+ *
+ * end:
+ *          apply(correlated predicate(Input.e = this.f))
+ *         /              \
+ * Input(output:b)    agg(output:fn,c; group by:this.f)
+ *                              |
+ *                    Filter(Uncorrelated predicate)

Review Comment:
   according to your code, the aggregate's group by and output would be changed.
   this comment not note this point. 



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java:
##########
@@ -146,4 +150,55 @@ public static String toSqlString(String planName, 
Object... variables) {
 
         return stringBuilder.append(" )").toString();
     }
+
+    /**
+     * See if there are correlated columns in a subquery expression.
+     */
+    public static boolean containCorrelatedSlot(List<Expression> 
correlatedSlots, Expression expr) {
+        if (correlatedSlots.isEmpty() || expr == null) {
+            return false;
+        }
+        if (expr instanceof SlotReference) {
+            return correlatedSlots.contains(expr);
+        }
+        for (Expression child : expr.children()) {
+            if (containCorrelatedSlot(correlatedSlots, child)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Get the correlated columns that belong to the subquery,
+     * that is, the correlated columns that can be resolved within the 
subquery.
+     * eg:
+     * select * from t1 where t1.a = (select sum(t2.b) from t2 where t1.c = 
t2.d));
+     * correlatedPredicates : t1.c = t2.d
+     * correlatedSlots : t1.c
+     * return t2.d
+     */
+    public static List<Expression> getCorrelatedSlots(List<Expression> 
correlatedPredicates,
+            List<Expression> correlatedSlots) {
+        List<Expression> slots = new ArrayList<>();
+        correlatedPredicates.stream().forEach(predicate -> {
+            BinaryExpression binaryExpression = (BinaryExpression) predicate;

Review Comment:
   if correlatedPredicates is always BinaryExpression, you should change the 
parameter type to `List<BinaryExpression>`. Or you need to judge the Expression 
type and throw exception when not supported



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ApplyPullFilterOnProjectUnderAgg.java:
##########
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite.logical;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Swap the order of project and filter under agg in correlated subqueries.
+ * before:
+ *              apply
+ *         /              \
+ * Input(output:b)        agg
+ *                         |
+ *                  Project(output:a)
+ *                         |
+ *              Filter(correlated predicate(Input.e = this.f)/Unapply 
predicate)
+ *                          |
+ *                         child
+ *              apply
+ *         /              \
+ * Input(output:b)        agg
+ *                         |
+ *              Filter(correlated predicate(Input.e = this.f)/Unapply 
predicate)
+ *                         |
+ *                  Project(output:a,child.output)

Review Comment:
   ditto, this project maybe add some expression that origin filter need, you 
should note it. And it is not 'child.output'



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeSubquery.java:
##########
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.analysis;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.Exists;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.InSubquery;
+import org.apache.doris.nereids.trees.expressions.ScalarSubquery;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SubqueryExpr;
+import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
+import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * AnalyzeSubquery. translate from subquery to correlatedJoin.
+ * In two steps
+ * The first step is to replace the predicate corresponding to the filter 
where the subquery is located.
+ * The second step converts the subquery into an apply node.
+ */
+public class AnalyzeSubquery implements AnalysisRuleFactory {
+    @Override
+    public List<Rule> buildRules() {
+        return ImmutableList.of(
+                RuleType.ANALYZE_FILTER_SUBQUERY.build(
+                        logicalFilter().thenApply(ctx -> {
+                            LogicalFilter filter = ctx.root;
+                            List<SubqueryExpr> subqueryExprs = 
filter.getPredicates()
+                                    .collect(SubqueryExpr.class::isInstance);
+                            if (subqueryExprs.isEmpty()) {
+                                return filter;
+                            }
+
+                            Optional<Expression> newPredicates =
+                                    
recursiveReplacePredicate(filter.getPredicates());
+                            if (newPredicates.isPresent()) {
+                                return new LogicalFilter<>(
+                                        newPredicates.get(),
+                                        analyzedSubquery(subqueryExprs,
+                                                (LogicalPlan) filter.child(), 
ctx.cascadesContext));
+                            }
+                            return analyzedSubquery(subqueryExprs,
+                                    (LogicalPlan) filter.child(), 
ctx.cascadesContext);
+                        })
+                )
+        );
+    }
+
+    /**
+     * Convert expressions with subqueries in filter.
+     * before:
+     *      1.filter(t1.a = scalarSubquery(output b));
+     *      2.filter(inSubquery);   inSubquery = (t1.a in select ***);
+     *      3.filter(exists);   exists = (select ***);
+     *
+     * after:
+     *      1.filter(t1.a = b);
+     *      2.filter(True);
+     *      3.filter(True);
+     */
+    private Optional<Expression> replaceSubquery(SubqueryExpr expr) {
+        if (expr instanceof InSubquery || expr instanceof Exists) {
+            return Optional.of(BooleanLiteral.TRUE);
+        } else if (expr instanceof ScalarSubquery) {
+            return Optional.of(expr.getQueryPlan().getOutput().get(0));
+        }
+        return Optional.empty();
+    }
+
+    private Optional<Expression> recursiveReplacePredicate(Expression 
oldPredicate) {
+        List<Optional<Expression>> newChildren = oldPredicate.children()
+                .stream()
+                .map(this::recursiveReplacePredicate)
+                .filter(Optional::isPresent)
+                .collect(Collectors.toList());
+
+        if (oldPredicate instanceof SubqueryExpr) {
+            return replaceSubquery((SubqueryExpr) oldPredicate);
+        }
+        return newChildren.isEmpty() ? Optional.of(oldPredicate) : 
Optional.of(oldPredicate.withChildren(
+                newChildren.stream().map(expr -> 
expr.get()).collect(Collectors.toList())));

Review Comment:
   > .map(expr -> expr.get())
   
   if you not check the Optional whether is present, why not remove Optional? 
Or you missing filter the present Optional here?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeSubquery.java:
##########
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.analysis;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.Exists;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.InSubquery;
+import org.apache.doris.nereids.trees.expressions.ScalarSubquery;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SubqueryExpr;
+import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
+import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * AnalyzeSubquery. translate from subquery to correlatedJoin.
+ * In two steps
+ * The first step is to replace the predicate corresponding to the filter 
where the subquery is located.
+ * The second step converts the subquery into an apply node.
+ */
+public class AnalyzeSubquery implements AnalysisRuleFactory {
+    @Override
+    public List<Rule> buildRules() {
+        return ImmutableList.of(
+                RuleType.ANALYZE_FILTER_SUBQUERY.build(
+                        logicalFilter().thenApply(ctx -> {
+                            LogicalFilter filter = ctx.root;
+                            List<SubqueryExpr> subqueryExprs = 
filter.getPredicates()
+                                    .collect(SubqueryExpr.class::isInstance);
+                            if (subqueryExprs.isEmpty()) {
+                                return filter;
+                            }
+
+                            Optional<Expression> newPredicates =
+                                    
recursiveReplacePredicate(filter.getPredicates());
+                            if (newPredicates.isPresent()) {
+                                return new LogicalFilter<>(
+                                        newPredicates.get(),
+                                        analyzedSubquery(subqueryExprs,
+                                                (LogicalPlan) filter.child(), 
ctx.cascadesContext));
+                            }
+                            return analyzedSubquery(subqueryExprs,
+                                    (LogicalPlan) filter.child(), 
ctx.cascadesContext);
+                        })
+                )
+        );
+    }
+
+    /**
+     * Convert expressions with subqueries in filter.

Review Comment:
   ```suggestion
        * The Subquery in the LogicalFilter will change to LogicalApply, so we 
must replace the origin Subquery
        * in the LogicalFilter(the meaning is remove origin Subquery in the 
LogicalFilter).
        * 
        * The replace rules are:
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/ExpressionTranslator.java:
##########
@@ -276,4 +283,30 @@ public Expr visitTimestampArithmetic(TimestampArithmetic 
arithmetic, PlanTransla
                     arithmetic.getDataType().toCatalogDataType());
         }
     }
+
+    public static org.apache.doris.analysis.AssertNumRowsElement 
translateAssert(
+            AssertNumRowsElement assertNumRowsElement) {
+        return new 
org.apache.doris.analysis.AssertNumRowsElement(assertNumRowsElement.getDesiredNumOfRows(),
+                assertNumRowsElement.getSubqueryString(), 
translateAsserTion(assertNumRowsElement.getAssertion()));
+    }
+
+    private static org.apache.doris.analysis.AssertNumRowsElement.Assertion 
translateAsserTion(

Review Comment:
   ```suggestion
       private static org.apache.doris.analysis.AssertNumRowsElement.Assertion 
translateAssertion(
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeSubquery.java:
##########
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.analysis;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.Exists;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.InSubquery;
+import org.apache.doris.nereids.trees.expressions.ScalarSubquery;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SubqueryExpr;
+import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
+import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * AnalyzeSubquery. translate from subquery to correlatedJoin.
+ * In two steps
+ * The first step is to replace the predicate corresponding to the filter 
where the subquery is located.
+ * The second step converts the subquery into an apply node.
+ */
+public class AnalyzeSubquery implements AnalysisRuleFactory {
+    @Override
+    public List<Rule> buildRules() {
+        return ImmutableList.of(
+                RuleType.ANALYZE_FILTER_SUBQUERY.build(
+                        logicalFilter().thenApply(ctx -> {
+                            LogicalFilter filter = ctx.root;
+                            List<SubqueryExpr> subqueryExprs = 
filter.getPredicates()
+                                    .collect(SubqueryExpr.class::isInstance);
+                            if (subqueryExprs.isEmpty()) {
+                                return filter;
+                            }
+
+                            Optional<Expression> newPredicates =
+                                    
recursiveReplacePredicate(filter.getPredicates());
+                            if (newPredicates.isPresent()) {
+                                return new LogicalFilter<>(
+                                        newPredicates.get(),
+                                        analyzedSubquery(subqueryExprs,
+                                                (LogicalPlan) filter.child(), 
ctx.cascadesContext));
+                            }
+                            return analyzedSubquery(subqueryExprs,
+                                    (LogicalPlan) filter.child(), 
ctx.cascadesContext);
+                        })
+                )
+        );
+    }
+
+    /**
+     * Convert expressions with subqueries in filter.
+     * before:
+     *      1.filter(t1.a = scalarSubquery(output b));
+     *      2.filter(inSubquery);   inSubquery = (t1.a in select ***);
+     *      3.filter(exists);   exists = (select ***);
+     *
+     * after:
+     *      1.filter(t1.a = b);
+     *      2.filter(True);
+     *      3.filter(True);
+     */
+    private Optional<Expression> replaceSubquery(SubqueryExpr expr) {
+        if (expr instanceof InSubquery || expr instanceof Exists) {
+            return Optional.of(BooleanLiteral.TRUE);
+        } else if (expr instanceof ScalarSubquery) {
+            return Optional.of(expr.getQueryPlan().getOutput().get(0));
+        }
+        return Optional.empty();
+    }
+
+    private Optional<Expression> recursiveReplacePredicate(Expression 
oldPredicate) {

Review Comment:
   why not use ExpressionRewrite to recursive replace child expression?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ApplyPullFilterOnAgg.java:
##########
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite.logical;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Merge the correlated predicate and agg in the filter under apply.
+ * And keep the unCorrelated predicate under agg.
+ *
+ * Use the correlated column as the group by column of agg,
+ * the output column is the correlated column and the input column.
+ *
+ * before:
+ *              apply
+ *          /              \
+ * Input(output:b)    agg(output:fn,c; group by:null)
+ *                              |
+ *              Filter(correlated predicate(Input.e = this.f)/Unapply 
predicate)
+ *
+ * end:
+ *          apply(correlated predicate(Input.e = this.f))
+ *         /              \
+ * Input(output:b)    agg(output:fn,c; group by:this.f)
+ *                              |
+ *                    Filter(Uncorrelated predicate)
+ */
+public class ApplyPullFilterOnAgg extends OneRewriteRuleFactory {
+    @Override
+    public Rule build() {
+        return logicalApply(any(), 
logicalAggregate(logicalFilter())).when(LogicalApply::isCorrelated).then(apply 
-> {

Review Comment:
   change `any()` to `group()` because you don't need the left child's plan type



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeSubquery.java:
##########
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.analysis;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.Exists;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.InSubquery;
+import org.apache.doris.nereids.trees.expressions.ScalarSubquery;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SubqueryExpr;
+import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
+import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * AnalyzeSubquery. translate from subquery to correlatedJoin.
+ * In two steps
+ * The first step is to replace the predicate corresponding to the filter 
where the subquery is located.
+ * The second step converts the subquery into an apply node.
+ */
+public class AnalyzeSubquery implements AnalysisRuleFactory {
+    @Override
+    public List<Rule> buildRules() {
+        return ImmutableList.of(
+                RuleType.ANALYZE_FILTER_SUBQUERY.build(
+                        logicalFilter().thenApply(ctx -> {
+                            LogicalFilter filter = ctx.root;
+                            List<SubqueryExpr> subqueryExprs = 
filter.getPredicates()
+                                    .collect(SubqueryExpr.class::isInstance);
+                            if (subqueryExprs.isEmpty()) {
+                                return filter;
+                            }
+
+                            Optional<Expression> newPredicates =
+                                    
recursiveReplacePredicate(filter.getPredicates());
+                            if (newPredicates.isPresent()) {
+                                return new LogicalFilter<>(
+                                        newPredicates.get(),
+                                        analyzedSubquery(subqueryExprs,
+                                                (LogicalPlan) filter.child(), 
ctx.cascadesContext));
+                            }
+                            return analyzedSubquery(subqueryExprs,
+                                    (LogicalPlan) filter.child(), 
ctx.cascadesContext);

Review Comment:
   ```suggestion
                               // analyze subquery has 3 steps.
                               // first step: remove Subquery in the 
LogicalFilter, and replace to some predicate expression
                               Optional<Expression> newPredicates =
                                       
recursiveReplacePredicate(filter.getPredicates());
                               // second step: create a LogicalApply to wrap 
outer plan and subquery plan
                               LogicalPlan outerPlanWithSubqueryPlan = 
analyzedSubquery(subqueryExprs,
                                                   (LogicalPlan) 
filter.child(), ctx.cascadesContext))
                               // third step: wrap filter if necessary, e.g. 
`outer.column1 = inner.column1`
                               return newPredicates.isPresent()
                                       ? new 
LogicalFilter<>(newPredicates.get(), outerPlanWithSubqueryPlan)
                                       : outerPlanWithSubqueryPlan;
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SubqueryExpr.java:
##########
@@ -22,19 +22,29 @@
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.types.DataType;
 
-import com.google.common.base.Preconditions;
-
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
 /**
  * Subquery Expression.
  */
 public abstract class SubqueryExpr extends Expression {
-    protected LogicalPlan queryPlan;
+    protected final LogicalPlan queryPlan;
+    protected final List<Slot> correlateSlots;
 
     public SubqueryExpr(LogicalPlan subquery) {
         this.queryPlan = Objects.requireNonNull(subquery, "subquery can not be 
null");
+        this.correlateSlots = new ArrayList<>();
+    }
+
+    public SubqueryExpr(LogicalPlan subquery, List<Slot> correlateSlots) {
+        this.queryPlan = Objects.requireNonNull(subquery, "subquery can not be 
null");
+        this.correlateSlots = Objects.requireNonNull(correlateSlots, 
"correlateSlots can not be null");

Review Comment:
   change to ImmutableList.copyOf



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SubqueryExpr.java:
##########
@@ -22,19 +22,29 @@
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.types.DataType;
 
-import com.google.common.base.Preconditions;
-
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
 /**
  * Subquery Expression.
  */
 public abstract class SubqueryExpr extends Expression {
-    protected LogicalPlan queryPlan;
+    protected final LogicalPlan queryPlan;
+    protected final List<Slot> correlateSlots;
 
     public SubqueryExpr(LogicalPlan subquery) {
         this.queryPlan = Objects.requireNonNull(subquery, "subquery can not be 
null");
+        this.correlateSlots = new ArrayList<>();

Review Comment:
   change to ImmutableList



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalApply.java:
##########
@@ -29,48 +33,84 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 
 /**
- * Apply Node for subquery.
  * Use this node to display the subquery in the relational algebra tree.
- * refer to "Orthogonal Optimization of Subqueries and Aggregation"
- *
- * @param <LEFT_CHILD_TYPE> input
- * @param <RIGHT_CHILD_TYPE> subquery
+ * @param <LEFT_CHILD_TYPE> input.
+ * @param <RIGHT_CHILD_TYPE> subquery.
  */
 public class LogicalApply<LEFT_CHILD_TYPE extends Plan, RIGHT_CHILD_TYPE 
extends Plan>
         extends LogicalBinary<LEFT_CHILD_TYPE, RIGHT_CHILD_TYPE> {
-    // correlation : subqueries and outer associated columns
-    private final List<Expression> correlation;
+    // correlation column
+    private final List<Expression> correlationSlot;
+    // original subquery
+    private final SubqueryExpr subqueryExpr;
+    // correlation Conjunction
+    private final Optional<Expression> correlationFilter;
+
+    /**
+     * Constructor.
+     */
+    public LogicalApply(Optional<GroupExpression> groupExpression,
+            Optional<LogicalProperties> logicalProperties,
+            LEFT_CHILD_TYPE leftChild, RIGHT_CHILD_TYPE rightChild, 
List<Expression> correlationSlot,
+            SubqueryExpr subqueryExpr, Optional<Expression> correlationFilter) 
{
+        super(PlanType.LOGICAL_APPLY, groupExpression, logicalProperties, 
leftChild, rightChild);
+        this.correlationSlot = correlationSlot == null ? new ArrayList<>() : 
ImmutableList.copyOf(correlationSlot);

Review Comment:
   change `new ArrayList<>()` to `ImmutableList.of()`.
   And I suggest move children to the last of constructor's parameters



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSlotReference.java:
##########
@@ -286,4 +322,55 @@ public List<Slot> getSlots() {
             return (List) children();
         }
     }
+
+    /**
+     * Use the visitor to iterate sub expression.
+     */
+    private static class SubExprAnalyzer<C> extends 
DefaultExpressionRewriter<C> {

Review Comment:
   ```suggestion
       private static class SubExprAnalyzer extends 
DefaultExpressionRewriter<PlannerContext> {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSlotReference.java:
##########
@@ -319,4 +338,153 @@ public List<Slot> getSlots() {
             return (List) children();
         }
     }
+
+    /**
+     * Use the visitor to iterate sub expression.
+     */
+    private static class SubExprAnalyzer<C> extends 
DefaultExpressionRewriter<C> {
+        private final Scope scope;
+        private final CascadesContext cascadesContext;
+
+        public SubExprAnalyzer(Scope scope, CascadesContext cascadesContext) {
+            this.scope = scope;
+            this.cascadesContext = cascadesContext;
+        }
+
+        @Override
+        public Expression visitNot(Not not, C context) {
+            Expression child = not.child();
+            if (child instanceof Exists) {
+                return visitExistsSubquery(
+                        new Exists(((Exists) child).getQueryPlan(), true), 
context);
+            } else if (child instanceof InSubquery) {
+                return visitInSubquery(new InSubquery(((InSubquery) 
child).getCompareExpr(),
+                        ((InSubquery) child).getListQuery(), true), context);
+            }
+            return visit(not, context);
+        }
+
+        @Override
+        public Expression visitExistsSubquery(Exists exists, C context) {
+            AnalyzedResult analyzedResult = analyzeSubquery(exists);
+
+            return new Exists(analyzedResult.getLogicalPlan(),
+                    getSlots(exists, analyzedResult), exists.isNot());
+        }
+
+        @Override
+        public Expression visitInSubquery(InSubquery expr, C context) {
+            AnalyzedResult analyzedResult = analyzeSubquery(expr);
+
+            checkOutputColumn(analyzedResult.getLogicalPlan());
+            checkHasGroupBy(analyzedResult);
+
+            return new InSubquery(
+                    expr.getCompareExpr().accept(this, context),
+                    new ListQuery(analyzedResult.getLogicalPlan()),
+                    getSlots(expr, analyzedResult), expr.isNot());
+        }
+
+        @Override
+        public Expression visitScalarSubquery(ScalarSubquery scalar, C 
context) {
+            AnalyzedResult analyzedResult = analyzeSubquery(scalar);
+
+            checkOutputColumn(analyzedResult.getLogicalPlan());
+            checkRootIsAgg(analyzedResult);
+            checkHasGroupBy(analyzedResult);
+
+            return new ScalarSubquery(analyzedResult.getLogicalPlan(), 
getSlots(scalar, analyzedResult));
+        }
+
+        private void checkOutputColumn(LogicalPlan plan) {
+            if (plan.getOutput().size() != 1) {
+                throw new AnalysisException("Multiple columns returned by 
subquery are not yet supported. Found "
+                        + plan.getOutput().size());
+            }
+        }
+
+        private void checkRootIsAgg(AnalyzedResult analyzedResult) {
+            if (!analyzedResult.isCorrelated()) {
+                return;
+            }
+            if (!analyzedResult.rootIsAgg()) {
+                throw new AnalysisException("The select item in correlated 
subquery of binary predicate "
+                        + "should only be sum, min, max, avg and count. 
Current subquery: "
+                        + analyzedResult.getLogicalPlan());
+            }
+        }
+
+        private void checkHasGroupBy(AnalyzedResult analyzedResult) {
+            if (!analyzedResult.isCorrelated()) {
+                return;
+            }
+            if (analyzedResult.hasGroupBy()) {
+                throw new AnalysisException("Unsupported correlated subquery 
with grouping and/or aggregation "
+                        + analyzedResult.getLogicalPlan());
+            }
+        }
+
+        private AnalyzedResult analyzeSubquery(SubqueryExpr expr) {
+            CascadesContext subqueryContext = new Memo(expr.getQueryPlan())
+                    
.newCascadesContext((cascadesContext.getStatementContext()));
+            Scope subqueryScope = genScopeWithSubquery(expr);
+            subqueryContext
+                    .newAnalyzer(Optional.of(subqueryScope))
+                    .analyze();
+            return new AnalyzedResult((LogicalPlan) 
subqueryContext.getMemo().copyOut(false),
+                    subqueryScope.getCorrelatedSlots(expr));
+        }
+
+        private Scope genScopeWithSubquery(SubqueryExpr expr) {
+            return new Scope(getScope().getOuterScope(),
+                    getScope().getSlots(),
+                    Optional.ofNullable(expr));
+        }
+
+        private List<Slot> getSlots(SubqueryExpr subqueryExpr, AnalyzedResult 
analyzedResult) {
+            return analyzedResult.getCorrelatedSlots().isEmpty()
+                    ? subqueryExpr.getCorrelateSlots() : 
analyzedResult.getCorrelatedSlots();

Review Comment:
   Why analyzedResult.getCorrelatedSlots() is empty but 
subqueryExpr.getCorrelateSlots() is not empty.
   I think the AnalyzeResult must answer all correlateSlots.
   A result class not contains all result is fantastic.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ApplyPullFilterOnProjectUnderAgg.java:
##########
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite.logical;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Swap the order of project and filter under agg in correlated subqueries.
+ * before:
+ *              apply
+ *         /              \
+ * Input(output:b)        agg
+ *                         |
+ *                  Project(output:a)
+ *                         |
+ *              Filter(correlated predicate(Input.e = this.f)/Unapply 
predicate)
+ *                          |
+ *                         child
+ *              apply
+ *         /              \
+ * Input(output:b)        agg
+ *                         |
+ *              Filter(correlated predicate(Input.e = this.f)/Unapply 
predicate)
+ *                         |
+ *                  Project(output:a,child.output)
+ *                          |
+ *                         child
+ */
+public class ApplyPullFilterOnProjectUnderAgg extends OneRewriteRuleFactory {
+    @Override
+    public Rule build() {
+        return logicalApply(any(), 
logicalAggregate(logicalProject(logicalFilter())))

Review Comment:
   change `any()` to `group()`



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java:
##########
@@ -146,4 +150,55 @@ public static String toSqlString(String planName, 
Object... variables) {
 
         return stringBuilder.append(" )").toString();
     }
+
+    /**
+     * See if there are correlated columns in a subquery expression.
+     */
+    public static boolean containCorrelatedSlot(List<Expression> 
correlatedSlots, Expression expr) {

Review Comment:
   This function can simplify to
   ```java
   return expr.anyMatch(correlatedSlots::contains);
   ```
   
   The KISS principal: Keep It Short and Simple.
   
   So do not traverse manually, refine and write the minimal core logic.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ApplyPullFilterOnProjectUnderAgg.java:
##########
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite.logical;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Swap the order of project and filter under agg in correlated subqueries.
+ * before:
+ *              apply
+ *         /              \
+ * Input(output:b)        agg
+ *                         |
+ *                  Project(output:a)
+ *                         |
+ *              Filter(correlated predicate(Input.e = this.f)/Unapply 
predicate)
+ *                          |
+ *                         child
+ *              apply
+ *         /              \
+ * Input(output:b)        agg
+ *                         |
+ *              Filter(correlated predicate(Input.e = this.f)/Unapply 
predicate)
+ *                         |
+ *                  Project(output:a,child.output)
+ *                          |
+ *                         child
+ */
+public class ApplyPullFilterOnProjectUnderAgg extends OneRewriteRuleFactory {
+    @Override
+    public Rule build() {
+        return logicalApply(any(), 
logicalAggregate(logicalProject(logicalFilter())))
+                .when(LogicalApply::isCorrelated).then(apply -> {
+                    LogicalAggregate<LogicalProject<LogicalFilter<GroupPlan>>> 
agg = apply.right();
+                    if (!agg.getGroupByExpressions().isEmpty()) {
+                        return apply;
+                    }

Review Comment:
   you can move this condition to pattern.
   ```java
   logicalApply(
       group(),
       logicalAggregate(
           logicalProject(logicalFilter()))
       ).when(agg -> agg.getGroupByExpressions().isEmpty())
   ).when(LogicalApply::isCorrelated).then(apply -> {})
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ExistsApplyToJoin.java:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite.logical;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Exists;
+import org.apache.doris.nereids.trees.expressions.functions.Count;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+
+/**
+ * Convert Existsapply to LogicalJoin.
+ *
+ * Exists
+ *    Correlated -> LEFT_SEMI_JOIN
+ *         apply                  LEFT_SEMI_JOIN(Correlated Predicate)
+ *      /       \         -->       /           \
+ *    input    queryPlan          input        queryPlan
+ *
+ *    UnCorrelated -> CROSS_JOIN(limit(1))
+ *          apply                       CROSS_JOIN
+ *      /           \          -->      /       \
+ *    input        queryPlan          input    limit(1)
+ *                                               |
+ *                                             queryPlan
+ *
+ * Not Exists
+ *    Correlated -> LEFT_ANTI_JOIN
+ *          apply                  LEFT_ANTI_JOIN(Correlated Predicate)
+ *       /       \         -->       /           \
+ *     input    queryPlan          input        queryPlan
+ *
+ *    UnCorrelated -> CROSS_JOIN(Count(1))
+ *         apply                Filter(count(1) = 0)
+ *      /       \         -->       /           \
+ *    input    queryPlan          input        count(1)
+ *                                               |
+ *                                             queryPlan
+ */
+public class ExistsApplyToJoin extends OneRewriteRuleFactory {
+    @Override
+    public Rule build() {
+        return logicalApply().when(LogicalApply::isExist).then(apply -> {
+            if (apply.isCorrelated()) {
+                return correlatedToJoin(apply);
+            } else {
+                return unCorrelatedToJoin(apply);
+            }
+        }).toRule(RuleType.EXISTS_APPLY_TO_JOIN);
+    }
+
+    private Plan correlatedToJoin(LogicalApply apply) {
+        if (((Exists) apply.getSubqueryExpr()).isNot()) {
+            return new LogicalJoin<>(JoinType.LEFT_ANTI_JOIN, 
Lists.newArrayList(), apply.getCorrelationFilter(),
+                    (LogicalPlan) apply.left(), (LogicalPlan) apply.right());
+        } else {
+            return new LogicalJoin<>(JoinType.LEFT_SEMI_JOIN, 
Lists.newArrayList(), apply.getCorrelationFilter(),
+                    (LogicalPlan) apply.left(), (LogicalPlan) apply.right());
+        }
+    }
+
+    private Plan unCorrelatedToJoin(LogicalApply unapply) {
+        if (((Exists) unapply.getSubqueryExpr()).isNot()) {
+            return unCorrelatedNotExist(unapply);
+        } else {
+            return unCorrelatedExist(unapply);
+        }
+    }
+
+    private Plan unCorrelatedNotExist(LogicalApply unapply) {
+        Alias alias = new Alias(new Count(), "count(*)");
+        LogicalAggregate newAgg = new LogicalAggregate<>(new ArrayList<>(),
+                ImmutableList.of(alias),
+                (LogicalPlan) unapply.right());
+        LogicalJoin newJoin = new LogicalJoin<>(JoinType.CROSS_JOIN,
+                (LogicalPlan) unapply.left(), newAgg);
+        return new LogicalFilter<>(new EqualTo(newAgg.getOutput().get(0),
+                new IntegerLiteral(0)), newJoin);

Review Comment:
   should change to `Not(EqualTo(xxx, 0))`?
   if we add a `limit(1)` here, you can change `EqualTo(xxx, 1)`



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ExistsApplyToJoin.java:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite.logical;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Exists;
+import org.apache.doris.nereids.trees.expressions.functions.Count;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+
+/**
+ * Convert Existsapply to LogicalJoin.
+ *
+ * Exists
+ *    Correlated -> LEFT_SEMI_JOIN
+ *         apply                  LEFT_SEMI_JOIN(Correlated Predicate)
+ *      /       \         -->       /           \
+ *    input    queryPlan          input        queryPlan
+ *
+ *    UnCorrelated -> CROSS_JOIN(limit(1))
+ *          apply                       CROSS_JOIN
+ *      /           \          -->      /       \
+ *    input        queryPlan          input    limit(1)
+ *                                               |
+ *                                             queryPlan
+ *
+ * Not Exists
+ *    Correlated -> LEFT_ANTI_JOIN
+ *          apply                  LEFT_ANTI_JOIN(Correlated Predicate)
+ *       /       \         -->       /           \
+ *     input    queryPlan          input        queryPlan
+ *
+ *    UnCorrelated -> CROSS_JOIN(Count(1))
+ *         apply                Filter(count(1) = 0)

Review Comment:
   missing CROSS_JOIN?
   and `Count(1)` should change to `Count(*)`?
   
   and we should add a limit(1) for performance?
   ```java
     filter(equal(cnt, 1))
             |
   aggregate(count(*) as cnt)
             |
         limit(1)
             |
         queryPlan
   ```
   
   `count(1)` not equal to `count(*)`, maybe wrong.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushApplyUnderProject.java:
##########
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite.logical;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
+import org.apache.doris.nereids.trees.expressions.ScalarSubquery;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Adjust the order of Project and apply in correlated subqueries.
+ *
+ * before:
+ *              apply
+ *         /              \
+ * Input(output:b)    Project(output:a)
+ *                         |
+ *                       child
+ *
+ * after:
+ *          Project(b,(if the Subquery is Scalar add 'a' as the output column))
+ *                  |
+ *                apply
+ *          /               \
+ * Input(output:b)          child
+ */
+public class PushApplyUnderProject extends OneRewriteRuleFactory {
+    @Override
+    public Rule build() {
+        return logicalApply(any(), 
logicalProject()).when(LogicalApply::isCorrelated).then(apply -> {

Review Comment:
   change `any()` to `group()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to