This is an automated email from the ASF dual-hosted git repository.

mbudiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/main by this push:
     new 5cfe2a5e29 [CALCITE-7031] Implement the general decorrelation 
algorithm (Neumann & Kemper)
5cfe2a5e29 is described below

commit 5cfe2a5e29addd07a9729cf416ec0421db8b7ba3
Author: Silun Dong <[email protected]>
AuthorDate: Thu Nov 6 00:36:58 2025 +0800

    [CALCITE-7031] Implement the general decorrelation algorithm (Neumann & 
Kemper)
---
 .../adapter/enumerable/EnumerableJoinRule.java     |   10 +-
 .../java/org/apache/calcite/plan/RelOptUtil.java   |   25 +-
 .../calcite/rel/core/ConditionalCorrelate.java     |   76 ++
 .../org/apache/calcite/rel/core/Correlate.java     |    7 +
 .../java/org/apache/calcite/rel/core/Join.java     |   17 +-
 .../org/apache/calcite/rel/core/JoinRelType.java   |   32 +-
 .../org/apache/calcite/rel/core/RelFactories.java  |   49 +
 .../rel/logical/LogicalConditionalCorrelate.java   |   90 ++
 .../org/apache/calcite/rel/metadata/RelMdSize.java |   18 +-
 .../org/apache/calcite/rel/metadata/RelMdUtil.java |    3 +
 .../org/apache/calcite/rel/rules/CoreRules.java    |   14 +
 .../rel/rules/MarkToSemiOrAntiJoinRule.java        |  143 +++
 .../calcite/rel/rules/SubQueryRemoveRule.java      |  134 +++
 .../calcite/sql/validate/SqlValidatorUtil.java     |   11 +
 .../sql2rel/TopDownGeneralDecorrelator.java        | 1125 ++++++++++++++++++++
 .../java/org/apache/calcite/tools/RelBuilder.java  |   14 +-
 .../org/apache/calcite/test/RelOptRulesTest.java   |  263 +++++
 .../org/apache/calcite/test/RelOptRulesTest.xml    |  728 +++++++++++++
 .../org/apache/calcite/test/RelOptFixture.java     |   41 +-
 19 files changed, 2767 insertions(+), 33 deletions(-)

diff --git 
a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableJoinRule.java
 
b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableJoinRule.java
index 57a6778e0d..fe40d17dc4 100644
--- 
a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableJoinRule.java
+++ 
b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableJoinRule.java
@@ -21,10 +21,14 @@
 import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.util.Bug;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -48,8 +52,12 @@ protected EnumerableJoinRule(Config config) {
     super(config);
   }
 
-  @Override public RelNode convert(RelNode rel) {
+  @Override public @Nullable RelNode convert(RelNode rel) {
     Join join = (Join) rel;
+    if (!Bug.TODO_FIXED && join.getJoinType() == JoinRelType.LEFT_MARK) {
+      // TODO implement LEFT MARK join
+      return null;
+    }
     List<RelNode> newInputs = new ArrayList<>();
     for (RelNode input : join.getInputs()) {
       if (!(input.getConvention() instanceof EnumerableConvention)) {
diff --git a/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java 
b/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
index 6698ec9b0d..34724a8adf 100644
--- a/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
+++ b/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
@@ -3960,12 +3960,25 @@ public static RelNode pushDownJoinConditions(Join 
originalJoin,
               joinCond, left, right, joinType, originalJoin.isSemiJoinDone()));
     }
     if (!extraLeftExprs.isEmpty() || !extraRightExprs.isEmpty()) {
-      final int totalFields = joinType.projectsRight()
-          ? leftCount + extraLeftExprs.size() + rightCount + 
extraRightExprs.size()
-          : leftCount + extraLeftExprs.size();
-      final int[] mappingRanges = joinType.projectsRight()
-          ? new int[] { 0, 0, leftCount, leftCount, leftCount + 
extraLeftExprs.size(), rightCount }
-          : new int[] { 0, 0, leftCount };
+      final int totalFields;
+      final int[] mappingRanges;
+      switch (joinType) {
+      case SEMI:
+      case ANTI:
+        totalFields = leftCount + extraLeftExprs.size();
+        mappingRanges = new int[] { 0, 0, leftCount };
+        break;
+      case LEFT_MARK:
+        totalFields = leftCount + extraLeftExprs.size() + 1;
+        mappingRanges
+            = new int[] { 0, 0, leftCount, leftCount, leftCount + 
extraLeftExprs.size(), 1 };
+        break;
+      default:
+        totalFields = leftCount + extraLeftExprs.size() + rightCount + 
extraRightExprs.size();
+        mappingRanges =
+            new int[] { 0, 0, leftCount, leftCount, leftCount + 
extraLeftExprs.size(), rightCount };
+        break;
+      }
       Mappings.TargetMapping mapping =
           Mappings.createShiftMapping(
               totalFields,
diff --git 
a/core/src/main/java/org/apache/calcite/rel/core/ConditionalCorrelate.java 
b/core/src/main/java/org/apache/calcite/rel/core/ConditionalCorrelate.java
new file mode 100644
index 0000000000..a6527f23fd
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/core/ConditionalCorrelate.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.core;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.rules.CoreRules;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import java.util.List;
+
+/**
+ * This is a extension of {@link Correlate} that contains a condition.
+ * When removing SOME/IN subqueries, the condition need to be retained in the 
left mark type
+ * Correlate (it cannot be pulled up or pushed down). This is why 
ConditionalCorrelate extends
+ * the condition.
+ *
+ * @see CoreRules#FILTER_SUB_QUERY_TO_MARK_CORRELATE
+ * @see CoreRules#PROJECT_SUB_QUERY_TO_MARK_CORRELATE
+ */
+public abstract class ConditionalCorrelate extends Correlate {
+
+  private final RexNode condition;
+
+  protected ConditionalCorrelate(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      List<RelHint> hints,
+      RelNode left,
+      RelNode right,
+      CorrelationId correlationId,
+      ImmutableBitSet requiredColumns,
+      JoinRelType joinType,
+      RexNode condition) {
+    super(cluster, traitSet, hints, left, right, correlationId, 
requiredColumns, joinType);
+    this.condition = condition;
+    assert joinType == JoinRelType.LEFT_MARK;
+  }
+
+  @Override public ConditionalCorrelate copy(RelTraitSet traitSet, 
List<RelNode> inputs) {
+    assert inputs.size() == 2;
+    return copy(traitSet, inputs.get(0), inputs.get(1), correlationId,
+        requiredColumns, joinType, condition);
+  }
+
+  public abstract ConditionalCorrelate copy(RelTraitSet traitSet, RelNode 
left, RelNode right,
+      CorrelationId correlationId, ImmutableBitSet requiredColumns, 
JoinRelType joinType,
+      RexNode condition);
+
+  @Override public RelWriter explainTerms(RelWriter pw) {
+    return super.explainTerms(pw)
+        .itemIf("condition", condition, !condition.isAlwaysTrue());
+  }
+
+  @Override public RexNode getCondition() {
+    return condition;
+  }
+}
diff --git a/core/src/main/java/org/apache/calcite/rel/core/Correlate.java 
b/core/src/main/java/org/apache/calcite/rel/core/Correlate.java
index 752dd9bf15..e9d2adbccd 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/Correlate.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/Correlate.java
@@ -29,6 +29,7 @@
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Litmus;
@@ -169,6 +170,7 @@ public JoinRelType getJoinType() {
     switch (joinType) {
     case LEFT:
     case INNER:
+    case LEFT_MARK:
       return SqlValidatorUtil.deriveJoinRowType(left.getRowType(),
           right.getRowType(), joinType,
           getCluster().getTypeFactory(), null,
@@ -211,6 +213,10 @@ public ImmutableBitSet getRequiredColumns() {
     return requiredColumns;
   }
 
+  public RexNode getCondition() {
+    return getCluster().getRexBuilder().makeLiteral(true);
+  }
+
   @Override public Set<CorrelationId> getVariablesSet() {
     return ImmutableSet.of(correlationId);
   }
@@ -220,6 +226,7 @@ public ImmutableBitSet getRequiredColumns() {
     switch (joinType) {
     case SEMI:
     case ANTI:
+    case LEFT_MARK:
       return leftRowCount;
     default:
       return leftRowCount * mq.getRowCount(right);
diff --git a/core/src/main/java/org/apache/calcite/rel/core/Join.java 
b/core/src/main/java/org/apache/calcite/rel/core/Join.java
index 8016e9f36a..1a56edbed0 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/Join.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/Join.java
@@ -152,10 +152,19 @@ public JoinRelType getJoinType() {
     if (!super.isValid(litmus, context)) {
       return false;
     }
-    if (getRowType().getFieldCount()
-        != getSystemFieldList().size()
-        + left.getRowType().getFieldCount()
-        + (joinType.projectsRight() ? right.getRowType().getFieldCount() : 0)) 
{
+    int expectedFieldCount = left.getRowType().getFieldCount();
+    switch (joinType) {
+    case SEMI:
+    case ANTI:
+      break;
+    case LEFT_MARK:
+      expectedFieldCount += 1;
+      break;
+    default:
+      expectedFieldCount += right.getRowType().getFieldCount();
+      break;
+    }
+    if (getRowType().getFieldCount() != expectedFieldCount) {
       return litmus.fail("field count mismatch");
     }
     if (condition != null) {
diff --git a/core/src/main/java/org/apache/calcite/rel/core/JoinRelType.java 
b/core/src/main/java/org/apache/calcite/rel/core/JoinRelType.java
index dbb2891ce2..ffe4a3d103 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/JoinRelType.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/JoinRelType.java
@@ -91,7 +91,32 @@ public enum JoinRelType {
   /**
    * The left version of an ASOF join, where each row from the left table is 
part of the output.
    */
-  LEFT_ASOF;
+  LEFT_ASOF,
+
+  /**
+   * An LEFT MARK JOIN will keep all rows from the left side and creates a new 
attribute to mark a
+   * tuple as having join partners from right side or not. Refer to
+   * <a href="https://dl.gi.de/items/c5f7c49f-1572-490e-976a-cc4292519bdd";>
+   *   The Complete Story of Joins (in HyPer)</a>.
+   *
+   * <p>Example:
+   * <blockquote><pre>
+   * SELECT EMPNO FROM EMP
+   * WHERE EXISTS (SELECT 1 FROM DEPT
+   *     WHERE DEPT.DEPTNO = EMP.DEPTNO)
+   *     OR EMPNO &gt; 1
+   *
+   * LogicalProject(EMPNO=[$0])
+   *   LogicalFilter(condition=[OR($9, &gt;($0, 1))])
+   *     LogicalJoin(condition=[IS NOT DISTINCT FROM($7, $9)], 
joinType=[left_mark])
+   *       LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+   *       LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+   * </pre></blockquote>
+   *
+   * <p> If the marker is used on only conjunctive predicates the optimizer 
will try to translate
+   * the mark join into semi or anti join.
+   */
+  LEFT_MARK;
 
   /** Lower-case name. */
   public final String lowerName = name().toLowerCase(Locale.ROOT);
@@ -173,7 +198,7 @@ public JoinRelType cancelNullsOnRight() {
   }
 
   public boolean projectsRight() {
-    return this != SEMI && this != ANTI;
+    return this != SEMI && this != ANTI && this != LEFT_MARK;
   }
 
   /** Returns whether this join type accepts pushing predicates from above 
into its predicate. */
@@ -185,7 +210,8 @@ public boolean canPushIntoFromAbove() {
   /** Returns whether this join type accepts pushing predicates from above 
into its left input. */
   @API(since = "1.28", status = API.Status.EXPERIMENTAL)
   public boolean canPushLeftFromAbove() {
-    return (this == INNER) || (this == LEFT) || (this == SEMI) || (this == 
ANTI);
+    return (this == INNER) || (this == LEFT) || (this == SEMI)
+        || (this == ANTI) || (this == LEFT_MARK);
   }
 
   /** Returns whether this join type accepts pushing predicates from above 
into its right input. */
diff --git a/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java 
b/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java
index 9844219ecb..e2a096762f 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java
@@ -29,6 +29,7 @@
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalAsofJoin;
+import org.apache.calcite.rel.logical.LogicalConditionalCorrelate;
 import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalExchange;
 import org.apache.calcite.rel.logical.LogicalFilter;
@@ -91,6 +92,9 @@ public class RelFactories {
   public static final CorrelateFactory DEFAULT_CORRELATE_FACTORY =
       new CorrelateFactoryImpl();
 
+  public static final ConditionalCorrelateFactory 
DEFAULT_CORRELATE_PLUS_FACTORY =
+      new ConditionalCorrelateFactoryImpl();
+
   public static final SortFactory DEFAULT_SORT_FACTORY =
       new SortFactoryImpl();
 
@@ -144,6 +148,7 @@ public class RelFactories {
           DEFAULT_JOIN_FACTORY,
           DEFAULT_ASOFJOIN_FACTORY,
           DEFAULT_CORRELATE_FACTORY,
+          DEFAULT_CORRELATE_PLUS_FACTORY,
           DEFAULT_VALUES_FACTORY,
           DEFAULT_TABLE_SCAN_FACTORY,
           DEFAULT_TABLE_FUNCTION_SCAN_FACTORY,
@@ -485,6 +490,44 @@ private static class CorrelateFactoryImpl implements 
CorrelateFactory {
     }
   }
 
+  /**
+   * Can create a ConditionalCorrelate of the appropriate type for a rule's 
calling
+   * convention.
+   *
+   * <p>The result is typically a {@link ConditionalCorrelate}.
+   */
+  public interface ConditionalCorrelateFactory {
+
+    /**
+     * Creates a ConditionalCorrelate.
+     *
+     * @param left             Left input
+     * @param right            Right input
+     * @param hints            Hints
+     * @param correlationId    Variable name for the row of left input
+     * @param requiredColumns  Required columns
+     * @param joinType         Join type
+     * @param condition        Join condition
+     */
+    RelNode createConditionalCorrelate(RelNode left, RelNode right, 
List<RelHint> hints,
+        CorrelationId correlationId, ImmutableBitSet requiredColumns,
+        JoinRelType joinType, RexNode condition);
+  }
+
+  /**
+   * Implementation of {@link ConditionalCorrelateFactory} that returns a 
vanilla
+   * {@link LogicalConditionalCorrelate}.
+   */
+  private static class ConditionalCorrelateFactoryImpl implements 
ConditionalCorrelateFactory {
+
+    @Override public RelNode createConditionalCorrelate(RelNode left, RelNode 
right,
+        List<RelHint> hints, CorrelationId correlationId, ImmutableBitSet 
requiredColumns,
+        JoinRelType joinType, RexNode condition) {
+      return LogicalConditionalCorrelate.create(left, right, hints, 
correlationId,
+          requiredColumns, joinType, condition);
+    }
+  }
+
   /**
    * Can create a semi-join of the appropriate type for a rule's calling
    * convention.
@@ -739,6 +782,7 @@ public static class Struct {
     public final JoinFactory joinFactory;
     public final AsofJoinFactory asofJoinFactory;
     public final CorrelateFactory correlateFactory;
+    public final ConditionalCorrelateFactory conditionalCorrelateFactory;
     public final ValuesFactory valuesFactory;
     public final TableScanFactory scanFactory;
     public final TableFunctionScanFactory tableFunctionScanFactory;
@@ -759,6 +803,7 @@ private Struct(FilterFactory filterFactory,
         JoinFactory joinFactory,
         AsofJoinFactory asofJoinFactory,
         CorrelateFactory correlateFactory,
+        ConditionalCorrelateFactory conditionalCorrelateFactory,
         ValuesFactory valuesFactory,
         TableScanFactory scanFactory,
         TableFunctionScanFactory tableFunctionScanFactory,
@@ -778,6 +823,8 @@ private Struct(FilterFactory filterFactory,
       this.joinFactory = requireNonNull(joinFactory, "joinFactory");
       this.asofJoinFactory = requireNonNull(asofJoinFactory, 
"asofJoinFactory");
       this.correlateFactory = requireNonNull(correlateFactory, 
"correlateFactory");
+      this.conditionalCorrelateFactory =
+          requireNonNull(conditionalCorrelateFactory, 
"conditionalCorrelateFactory");
       this.valuesFactory = requireNonNull(valuesFactory, "valuesFactory");
       this.scanFactory = requireNonNull(scanFactory, "scanFactory");
       this.tableFunctionScanFactory =
@@ -816,6 +863,8 @@ public static Struct fromContext(Context context) {
               .orElse(DEFAULT_ASOFJOIN_FACTORY),
           context.maybeUnwrap(CorrelateFactory.class)
               .orElse(DEFAULT_CORRELATE_FACTORY),
+          context.maybeUnwrap(ConditionalCorrelateFactory.class)
+                  .orElse(DEFAULT_CORRELATE_PLUS_FACTORY),
           context.maybeUnwrap(ValuesFactory.class)
               .orElse(DEFAULT_VALUES_FACTORY),
           context.maybeUnwrap(TableScanFactory.class)
diff --git 
a/core/src/main/java/org/apache/calcite/rel/logical/LogicalConditionalCorrelate.java
 
b/core/src/main/java/org/apache/calcite/rel/logical/LogicalConditionalCorrelate.java
new file mode 100644
index 0000000000..2df2f839c5
--- /dev/null
+++ 
b/core/src/main/java/org/apache/calcite/rel/logical/LogicalConditionalCorrelate.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.ConditionalCorrelate;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import java.util.List;
+
+/**
+ * Sub-class of {@link ConditionalCorrelate} not targeted at any particular 
engine or calling convention.
+ */
+public final class LogicalConditionalCorrelate extends ConditionalCorrelate {
+  //~ Instance fields --------------------------------------------------------
+
+  //~ Constructors -----------------------------------------------------------
+
+  /**
+   * Creates a LogicalConditionalCorrelate.
+   *
+   * @param cluster         Cluster this relational expression belongs to
+   * @param left            Left input relational expression
+   * @param right           Right input relational expression
+   * @param correlationId   Variable name for the row of left input
+   * @param requiredColumns Required columns
+   * @param joinType        Join type
+   * @param condition       Join condition
+   */
+  public LogicalConditionalCorrelate(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      List<RelHint> hints,
+      RelNode left,
+      RelNode right,
+      CorrelationId correlationId,
+      ImmutableBitSet requiredColumns,
+      JoinRelType joinType,
+      RexNode condition) {
+    super(cluster, traitSet, hints, left, right, correlationId,
+        requiredColumns, joinType, condition);
+  }
+
+  /** Creates a LogicalConditionalCorrelate. */
+  public static LogicalConditionalCorrelate create(RelNode left, RelNode 
right, List<RelHint> hints,
+      CorrelationId correlationId, ImmutableBitSet requiredColumns, 
JoinRelType joinType,
+      RexNode condition) {
+    final RelOptCluster cluster = left.getCluster();
+    final RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE);
+    return new LogicalConditionalCorrelate(cluster, traitSet, hints, left, 
right, correlationId,
+        requiredColumns, joinType, condition);
+  }
+
+  @Override public ConditionalCorrelate copy(RelTraitSet traitSet, RelNode 
left, RelNode right,
+      CorrelationId correlationId, ImmutableBitSet requiredColumns, 
JoinRelType joinType,
+      RexNode condition) {
+    assert traitSet.containsIfApplicable(Convention.NONE);
+    return new LogicalConditionalCorrelate(getCluster(), traitSet, hints, 
left, right,
+        correlationId, requiredColumns, joinType, condition);
+  }
+
+  @Override public Correlate copy(RelTraitSet traitSet,
+      RelNode left, RelNode right, CorrelationId correlationId,
+      ImmutableBitSet requiredColumns, JoinRelType joinType) {
+    // This method does not provide the condition as an argument, so it should 
never be called
+    throw new RuntimeException("This method should not be called");
+  }
+}
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdSize.java 
b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdSize.java
index b6a687e91d..675aa1e363 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdSize.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdSize.java
@@ -25,6 +25,7 @@
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Intersect;
 import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.Minus;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.Sort;
@@ -45,6 +46,7 @@
 import org.apache.calcite.util.Pair;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -206,14 +208,24 @@ protected RelMdSize() {}
     return averageJoinColumnSizes(rel, mq);
   }
 
-  private static @Nullable List<@Nullable Double> averageJoinColumnSizes(Join 
rel,
+  private @Nullable List<@Nullable Double> averageJoinColumnSizes(Join rel,
       RelMetadataQuery mq) {
     boolean semiOrAntijoin = !rel.getJoinType().projectsRight();
     final RelNode left = rel.getLeft();
     final RelNode right = rel.getRight();
     final @Nullable List<@Nullable Double> lefts = 
mq.getAverageColumnSizes(left);
-    final @Nullable List<@Nullable Double> rights =
-        semiOrAntijoin ? null : mq.getAverageColumnSizes(right);
+    final @Nullable List<@Nullable Double> rights;
+    if (semiOrAntijoin) {
+      if (rel.getJoinType() == JoinRelType.LEFT_MARK) {
+        RelDataTypeField markColType =
+            
rel.getRowType().getFieldList().get(rel.getRowType().getFieldCount() - 1);
+        rights = Lists.newArrayList(averageFieldValueSize(markColType));
+      } else {
+        rights = null;
+      }
+    } else {
+      rights = mq.getAverageColumnSizes(right);
+    }
     if (lefts == null && rights == null) {
       return null;
     }
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUtil.java 
b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUtil.java
index dc12983f3f..227e19a852 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUtil.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUtil.java
@@ -813,6 +813,9 @@ public static double getMinusRowCount(RelMetadataQuery mq, 
Minus minus) {
   public static @Nullable Double getJoinRowCount(RelMetadataQuery mq, Join 
join,
       RexNode condition) {
     if (!join.getJoinType().projectsRight()) {
+      if (join.getJoinType() == JoinRelType.LEFT_MARK) {
+        return mq.getRowCount(join.getLeft());
+      }
       // Create a RexNode representing the selectivity of the
       // semijoin filter and pass it to getSelectivity
       RexNode semiJoinSelectivity =
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/CoreRules.java 
b/core/src/main/java/org/apache/calcite/rel/rules/CoreRules.java
index 938df4685b..c604f71358 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/CoreRules.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/CoreRules.java
@@ -498,6 +498,20 @@ private CoreRules() {}
   public static final SubQueryRemoveRule JOIN_SUB_QUERY_TO_CORRELATE =
       SubQueryRemoveRule.Config.JOIN.toRule();
 
+  /** Rule that converts sub-queries from filter expressions into
+   * {@link Correlate} instances. It will rewrite SOME/EXISTS/IN to a LEFT 
MARK type Correlate. */
+  public static final SubQueryRemoveRule FILTER_SUB_QUERY_TO_MARK_CORRELATE =
+      SubQueryRemoveRule.Config.FILTER_ENABLE_MARK_JOIN.toRule();
+
+  /** Rule that converts sub-queries from project expressions into
+   * {@link Correlate} instances. It will rewrite SOME/EXISTS/IN to a LEFT 
MARK type Correlate. */
+  public static final SubQueryRemoveRule PROJECT_SUB_QUERY_TO_MARK_CORRELATE =
+      SubQueryRemoveRule.Config.PROJECT_ENABLE_MARK_JOIN.toRule();
+
+  /** Rule that converts mark join to semi/anti join. */
+  public static final MarkToSemiOrAntiJoinRule MARK_TO_SEMI_OR_ANTI_JOIN_RULE =
+      MarkToSemiOrAntiJoinRule.Config.DEFAULT.toRule();
+
   /** Rule that converts SUM to SUM0 in OVER clauses in a project list. */
   public static final ProjectOverSumToSum0Rule PROJECT_OVER_SUM_TO_SUM0_RULE =
       ProjectOverSumToSum0Rule.Config.DEFAULT.toRule();
diff --git 
a/core/src/main/java/org/apache/calcite/rel/rules/MarkToSemiOrAntiJoinRule.java 
b/core/src/main/java/org/apache/calcite/rel/rules/MarkToSemiOrAntiJoinRule.java
new file mode 100644
index 0000000000..9c4230db38
--- /dev/null
+++ 
b/core/src/main/java/org/apache/calcite/rel/rules/MarkToSemiOrAntiJoinRule.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.Strong;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import org.immutables.value.Value;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.calcite.plan.RelOptUtil.conjunctions;
+
+/**
+ * Rule to simplify a mark join to semi join or anti join. This rule is 
applies by default after
+ * general decorrelation.
+ *
+ * @see org.apache.calcite.sql2rel.TopDownGeneralDecorrelator
+ */
[email protected]
+public class MarkToSemiOrAntiJoinRule
+    extends RelRule<MarkToSemiOrAntiJoinRule.Config>
+    implements TransformationRule {
+
+
+  /** Creates a MarkToSemiOrAntiJoinRule. */
+  protected MarkToSemiOrAntiJoinRule(Config config) {
+    super(config);
+  }
+
+  @Override public void onMatch(RelOptRuleCall call) {
+    final Project project = call.rel(0);
+    final Filter filter = call.rel(1);
+    final Join join = call.rel(2);
+    final RelBuilder builder = call.builder();
+
+    int markIndex = join.getRowType().getFieldCount() - 1;
+    ImmutableBitSet projectColumns = 
RelOptUtil.InputFinder.bits(project.getProjects(), null);
+    ImmutableBitSet filterColumns = 
RelOptUtil.InputFinder.bits(filter.getCondition());
+    if (projectColumns.get(markIndex) || !filterColumns.get(markIndex)) {
+      return;
+    }
+
+    // Proj        <- no result of the project depends on marker
+    //   Filter    <- condition depends on marker
+    //     Join    <- mark join
+    // After expressing the filter condition as a conjunction, there are only 
two cases to simplify:
+    // 1. only reference the marker, simplify to semi join
+    // 2. NOT(marker), and the join condition will only return TRUE/FALSE
+    //    (will not return NULL values), simplify to anti join
+    boolean toSemi = false;
+    boolean toAnti = false;
+    List<RexNode> filterConditions = 
RelOptUtil.conjunctions(filter.getCondition());
+    List<RexNode> newFilterConditions = new ArrayList<>();
+    for (RexNode condition : filterConditions) {
+      final ImmutableBitSet inputBits = RelOptUtil.InputFinder.bits(condition);
+      // marker is not referenced
+      if (!inputBits.get(markIndex)) {
+        newFilterConditions.add(condition);
+        continue;
+      }
+
+      // only reference the marker, to semi join
+      if (condition instanceof RexInputRef && !toAnti) {
+        toSemi = true;
+        continue;
+      }
+      // NOT(marker), and the join condition will only return TRUE/FALSE, to 
anti join
+      if (condition instanceof RexCall
+          && condition.isA(SqlKind.NOT)
+          && ((RexCall) condition).getOperands().get(0) instanceof RexInputRef
+          && isJoinConditionNotStrong(join.getCondition())
+          && !toSemi) {
+        toAnti = true;
+        continue;
+      }
+      // other forms cannot be simplified, for example, disjunction
+      return;
+    }
+    JoinRelType newJoinType = toSemi ? JoinRelType.SEMI : JoinRelType.ANTI;
+    RelNode result
+        = builder.push(join.getLeft()).push(join.getRight())
+            .join(newJoinType, join.getCondition())
+            .filter(newFilterConditions)
+            .project(project.getProjects())
+            .build();
+    call.transformTo(result);
+  }
+
+  private static boolean isJoinConditionNotStrong(RexNode condition) {
+    List<RexNode> conjunctions = conjunctions(condition);
+    for (RexNode expr : conjunctions) {
+      if (Strong.isStrong(expr)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /** Rule configuration. */
+  @Value.Immutable
+  public interface Config extends RelRule.Config {
+    Config DEFAULT = ImmutableMarkToSemiOrAntiJoinRule.Config.of()
+        .withOperandSupplier(b1 ->
+            b1.operand(Project.class).oneInput(b2 ->
+                b2.operand(Filter.class).oneInput(b3 ->
+                    b3.operand(Join.class).predicate(join ->
+                            join.getJoinType() == 
JoinRelType.LEFT_MARK).anyInputs())));
+
+    @Override default MarkToSemiOrAntiJoinRule toRule() {
+      return new MarkToSemiOrAntiJoinRule(this);
+    }
+  }
+
+}
diff --git 
a/core/src/main/java/org/apache/calcite/rel/rules/SubQueryRemoveRule.java 
b/core/src/main/java/org/apache/calcite/rel/rules/SubQueryRemoveRule.java
index 8ed8c287b4..37e92aa23a 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/SubQueryRemoveRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/SubQueryRemoveRule.java
@@ -41,6 +41,7 @@
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.fun.SqlQuantifyOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql2rel.RelDecorrelator;
@@ -1078,6 +1079,123 @@ private static void matchJoin(SubQueryRemoveRule rule, 
RelOptRuleCall call) {
     call.transformTo(builder.build());
   }
 
+  private static void matchFilterEnableMarkJoin(SubQueryRemoveRule rule, 
RelOptRuleCall call) {
+    final Filter filter = call.rel(0);
+    final Set<CorrelationId> variablesSet = filter.getVariablesSet();
+    final RelBuilder builder = call.builder();
+    builder.push(filter.getInput());
+    List<RexNode> newCondition
+        = rule.applyEnableMarkJoin(variablesSet, 
ImmutableList.of(filter.getCondition()), builder);
+    assert newCondition.size() == 1;
+    builder.filter(newCondition.get(0));
+    builder.project(fields(builder, filter.getRowType().getFieldCount()));
+    call.transformTo(builder.build());
+  }
+
+  private static void matchProjectEnableMarkJoin(SubQueryRemoveRule rule, 
RelOptRuleCall call) {
+    final Project project = call.rel(0);
+    final Set<CorrelationId> variablesSet = project.getVariablesSet();
+    final RelBuilder builder = call.builder();
+    builder.push(project.getInput());
+    List<RexNode> newProjects
+        = rule.applyEnableMarkJoin(variablesSet, project.getProjects(), 
builder);
+    builder.project(newProjects, project.getRowType().getFieldNames());
+    call.transformTo(builder.build());
+  }
+
+  private List<RexNode> applyEnableMarkJoin(Set<CorrelationId> 
variablesSetOfRelNode,
+      List<RexNode> expressions, RelBuilder builder) {
+    List<RexNode> newExpressions = new ArrayList<>(expressions);
+    int count = 0;
+    while (true) {
+      final RexSubQuery e = RexUtil.SubQueryFinder.find(newExpressions);
+      if (e == null) {
+        assert count > 0;
+        break;
+      }
+      ++count;
+      final Set<CorrelationId> variablesSet = 
RelOptUtil.getVariablesUsed(e.rel);
+      // Only keep the correlation that are defined in the current RelNode 
level, to avoid creating
+      // wrong Correlate node.
+      variablesSet.retainAll(variablesSetOfRelNode);
+
+      RexNode target;
+      // rewrite EXISTS/IN/SOME to left mark join/correlate
+      switch (e.getKind()) {
+      case EXISTS:
+      case IN:
+      case SOME:
+        target =
+            rewriteToMarkJoin(e, variablesSet, builder,
+                builder.peek().getRowType().getFieldCount());
+        break;
+      case SCALAR_QUERY:
+        target =
+            rewriteScalarQuery(e, variablesSet, builder, 1,
+                builder.peek().getRowType().getFieldCount());
+        break;
+      case ARRAY_QUERY_CONSTRUCTOR:
+      case MAP_QUERY_CONSTRUCTOR:
+      case MULTISET_QUERY_CONSTRUCTOR:
+        target =
+            rewriteCollection(e, variablesSet, builder, 1,
+                builder.peek().getRowType().getFieldCount());
+        break;
+      case UNIQUE:
+        target = rewriteUnique(e, builder);
+        break;
+      default:
+        throw new AssertionError(e.getKind());
+      }
+      final RexShuttle shuttle = new ReplaceSubQueryShuttle(e, target);
+      newExpressions = shuttle.apply(newExpressions);
+    }
+    return newExpressions;
+  }
+
+  /**
+   * Rewrites a IN/SOME/EXISTS RexSubQuery into a {@link Join} of LEFT MARK 
type.
+   *
+   * @param e             IN/SOME/EXISTS Sub-query to rewrite
+   * @param variablesSet  A set of variables used by a relational
+   *                      expression of the specified RexSubQuery
+   * @param builder       Builder
+   * @param offset        Offset to shift {@link RexInputRef}
+   * @return  Expression that may be used to replace the RexSubQuery
+   */
+  private static RexNode rewriteToMarkJoin(RexSubQuery e, Set<CorrelationId> 
variablesSet,
+      RelBuilder builder, int offset) {
+    builder.push(e.rel);
+    final List<RexNode> rightShiftRef = RexUtil.shift(builder.fields(), 
offset);
+    final List<RexNode> externalPredicate = new ArrayList<>();
+    final SqlOperator externalOperator;
+    switch (e.getKind()) {
+    case SOME:
+      SqlQuantifyOperator op = (SqlQuantifyOperator) e.op;
+      externalOperator = RelOptUtil.op(op.comparisonKind, 
SqlStdOperatorTable.EQUALS);
+      break;
+    case IN:
+      externalOperator = SqlStdOperatorTable.EQUALS;
+      break;
+    case EXISTS:
+      externalOperator = SqlStdOperatorTable.EQUALS;
+      assert e.getOperands().isEmpty();
+      break;
+    default:
+      throw new IllegalArgumentException("Only IN/SOME/EXISTS sub-query can be 
rewritten to "
+          + "left mark join, but got: " + e.getKind());
+    }
+    Pair.zip(e.getOperands(), rightShiftRef, false).stream()
+        .map(pair -> builder.call(externalOperator, pair.left, pair.right))
+        .forEach(externalPredicate::add);
+
+    builder.join(
+        JoinRelType.LEFT_MARK,
+        RexUtil.composeConjunction(builder.getRexBuilder(), externalPredicate),
+        variablesSet);
+    return last(builder.fields());
+  }
+
   /** Shuttle that replaces occurrences of a given
    * {@link org.apache.calcite.rex.RexSubQuery} with a replacement
    * expression. */
@@ -1122,6 +1240,22 @@ public interface Config extends RelRule.Config {
                 .anyInputs())
         .withDescription("SubQueryRemoveRule:Join");
 
+    Config PROJECT_ENABLE_MARK_JOIN = 
ImmutableSubQueryRemoveRule.Config.builder()
+        .withMatchHandler(SubQueryRemoveRule::matchProjectEnableMarkJoin)
+        .build()
+        .withOperandSupplier(b ->
+            b.operand(Project.class)
+                
.predicate(RexUtil.SubQueryFinder::containsSubQuery).anyInputs())
+        .withDescription("SubQueryRemoveRule:ProjectEnableMarkJoin");
+
+    Config FILTER_ENABLE_MARK_JOIN = 
ImmutableSubQueryRemoveRule.Config.builder()
+        .withMatchHandler(SubQueryRemoveRule::matchFilterEnableMarkJoin)
+        .build()
+        .withOperandSupplier(b ->
+            b.operand(Filter.class)
+                
.predicate(RexUtil.SubQueryFinder::containsSubQuery).anyInputs())
+        .withDescription("SubQueryRemoveRule:FilterEnableMarkJoin");
+
     @Override default SubQueryRemoveRule toRule() {
       return new SubQueryRemoveRule(this);
     }
diff --git 
a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorUtil.java 
b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorUtil.java
index ebfc011b3e..a01367057a 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorUtil.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorUtil.java
@@ -57,6 +57,7 @@
 import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.type.SqlTypeUtil;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Litmus;
@@ -67,6 +68,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -555,6 +557,15 @@ public static RelDataType deriveJoinRowType(
     case ANTI:
       rightType = null;
       break;
+    case LEFT_MARK:
+      final String markColName =
+          SqlValidatorUtil.uniquify("markCol", 
Sets.newHashSet(leftType.getFieldNames()),
+                  SqlValidatorUtil.EXPR_SUGGESTER);
+      rightType =
+          typeFactory.createStructType(
+              ImmutableList.of(typeFactory.createSqlType(SqlTypeName.BOOLEAN)),
+              ImmutableList.of(markColName));
+      break;
     default:
       break;
     }
diff --git 
a/core/src/main/java/org/apache/calcite/sql2rel/TopDownGeneralDecorrelator.java 
b/core/src/main/java/org/apache/calcite/sql2rel/TopDownGeneralDecorrelator.java
new file mode 100644
index 0000000000..cea5e5d5a4
--- /dev/null
+++ 
b/core/src/main/java/org/apache/calcite/sql2rel/TopDownGeneralDecorrelator.java
@@ -0,0 +1,1125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql2rel;
+
+import org.apache.calcite.linq4j.function.Experimental;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.Strong;
+import org.apache.calcite.plan.hep.HepPlanner;
+import org.apache.calcite.plan.hep.HepProgram;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.rules.CoreRules;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexWindow;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlCountAggFunction;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql2rel.RelDecorrelator.CorDef;
+import org.apache.calcite.sql2rel.RelDecorrelator.Frame;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Litmus;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.ReflectUtil;
+import org.apache.calcite.util.ReflectiveVisitor;
+import org.apache.calcite.util.mapping.Mappings;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.IntStream;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A top‑down, generic decorrelation algorithm that can handle deep nestings 
of correlated
+ * subqueries and that generalizes to complex query constructs. More details 
are in paper:
+ * <a href="https://dl.gi.de/items/b9df4765-d1b0-4267-a77c-4ce4ab0ee62d";>
+ *   Improving Unnesting of Complex Queries</a>. It's an improved version of 
the paper:
+ * <a href="https://dl.gi.de/items/137d6917-d8fe-43aa-940b-e27da7c01625";>
+ *   Unnesting Arbitrary Queries</a>.
+ *
+ * <p> Usage notes for TopDownGeneralDecorrelator:
+ *
+ * <p> TopDownGeneralDecorrelator is not yet integrated into other modules and 
needs to be called
+ * separately. If you want to use it to replace {@link RelDecorrelator}, we 
recommend:
+ *
+ * <ol>
+ *   <li>When generating the initial plan by {@link SqlToRelConverter}, do not 
remove subqueries
+ *   and do not enable decorrelation.</li>
+ *   <li>Build a {@link HepPlanner} and apply rules for removing subqueries to 
the initial
+ *   plan. With subqueries removed correctly, TopDownGeneralDecorrelator can 
in theory eliminate all
+ *   correlation. We recommend using {@link 
CoreRules#FILTER_SUB_QUERY_TO_MARK_CORRELATE}
+ *   and {@link CoreRules#PROJECT_SUB_QUERY_TO_MARK_CORRELATE} to remove 
subqueries from Filter and
+ *   Project. These rules produce LEFT MARK Join/Correlate which are better 
suited for
+ *   TopDownGeneralDecorrelator. There is not yet a corresponding, specially 
tailored rule for
+ *   Join; you may choose to use {@link 
CoreRules#JOIN_SUB_QUERY_TO_CORRELATE}. Alternatively, for
+ *   greater stability, you can run TopDownGeneralDecorrelator first and then 
apply
+ *   {@link CoreRules#JOIN_SUB_QUERY_TO_CORRELATE} together with {@link 
RelDecorrelator}.</li>
+ *   <li>Call {@link TopDownGeneralDecorrelator#decorrelateQuery(RelNode, 
RelBuilder)} to obtain
+ *   the decorrelated plan.</li>
+ *   <li>Continue with other optimizations.</li>
+ * </ol>
+ *
+ * <p> See
+ * 
<code>org.apache.calcite.test.RelOptRulesTest#testTopDownGeneralDecorrelateForFilterExists()
+ * </code> and 
<code>org.apache.calcite.test.RelOptFixture#checkPlanning(boolean)</code> for
+ * working examples.
+ */
+@Experimental
+public class TopDownGeneralDecorrelator implements ReflectiveVisitor {
+
+  private final RelBuilder builder;
+
+  // record the CorDef in the current context (including those in the parent 
Correlate).
+  // NavigableSet is used to ensure a stable iteration order.
+  private final NavigableSet<CorDef> corDefs;
+
+  // a map from RelNode to whether existing correlated expressions (according 
to corDefs).
+  private final Map<RelNode, Boolean> hasCorrelatedExpressions;
+
+  // a map from RelNode to its UnnestedQuery.
+  private final Map<RelNode, UnnestedQuery> mapRelToUnnestedQuery;
+
+  private final boolean hasParent;
+
+  // the domain of the free variables (i.e. corDefs) D, it's duplicate free.
+  private DedupFreeVarsNode dedupFreeVarsNode;
+
+  // invokes using reflection a method named unnestInternal based on the
+  // runtime type of the argument.
+  @SuppressWarnings("method.invocation.invalid")
+  private final ReflectUtil.MethodDispatcher<RelNode> dispatcher =
+      ReflectUtil.createMethodDispatcher(
+          RelNode.class, getVisitor(), "unnestInternal", RelNode.class, 
boolean.class);
+
+  /**
+   * Creates a TopDownGeneralDecorrelator. If parent context arguments are 
provided,
+   * they are reused/merged into this instance.
+   *
+   * @param builder                         RelBuilder
+   * @param hasParent                       whether has parent decorrelator
+   * @param parentCorDefs                   corDefs from parent decorrelator
+   * @param parentHasCorrelatedExpressions  a map from RelNode to whether 
existing correlated
+   *                                        expressions
+   * @param parentMapRelToUnnestedQuery       a map from RelNode to its 
UnnestedQuery
+   */
+  @SuppressWarnings("initialization.fields.uninitialized")
+  private TopDownGeneralDecorrelator(
+      RelBuilder builder,
+      boolean hasParent,
+      @Nullable Set<CorDef> parentCorDefs,
+      @Nullable Map<RelNode, Boolean> parentHasCorrelatedExpressions,
+      @Nullable Map<RelNode, UnnestedQuery> parentMapRelToUnnestedQuery) {
+    this.builder = builder;
+    this.hasParent = hasParent;
+    this.corDefs = new TreeSet<>();
+    if (parentCorDefs != null) {
+      this.corDefs.addAll(parentCorDefs);
+    }
+    this.hasCorrelatedExpressions = parentHasCorrelatedExpressions == null
+        ? new HashMap<>()
+        : parentHasCorrelatedExpressions;
+    this.mapRelToUnnestedQuery = parentMapRelToUnnestedQuery == null
+        ? new HashMap<>()
+        : parentMapRelToUnnestedQuery;
+  }
+
+  public static TopDownGeneralDecorrelator createEmptyDecorrelator(RelBuilder 
builder) {
+    return new TopDownGeneralDecorrelator(builder, false, null, null, null);
+  }
+
+  private TopDownGeneralDecorrelator createSubDecorrelator() {
+    TopDownGeneralDecorrelator subDecorrelator =
+        new TopDownGeneralDecorrelator(
+            builder,
+            true,
+            corDefs,
+            hasCorrelatedExpressions,
+            mapRelToUnnestedQuery);
+    subDecorrelator.dedupFreeVarsNode = this.dedupFreeVarsNode;
+    return subDecorrelator;
+  }
+
+  /**
+   * Decorrelates a query. This is the entry point for this class.
+   *
+   * @param rel     Root node of the query
+   * @param builder RelBuilder
+   * @return  Equivalent node without correlation
+   */
+  public static RelNode decorrelateQuery(RelNode rel, RelBuilder builder) {
+    HepProgram preProgram = HepProgram.builder()
+        .addRuleCollection(
+            ImmutableList.of(
+                CoreRules.FILTER_PROJECT_TRANSPOSE,
+                CoreRules.FILTER_INTO_JOIN,
+                CoreRules.FILTER_CORRELATE))
+        .build();
+    HepPlanner prePlanner = new HepPlanner(preProgram);
+    prePlanner.setRoot(rel);
+    RelNode preparedRel = prePlanner.findBestExp();
+
+    // start decorrelating
+    TopDownGeneralDecorrelator decorrelator = createEmptyDecorrelator(builder);
+    RelNode decorrelateNode = rel;
+    try {
+      decorrelateNode = decorrelator.correlateElimination(preparedRel, true);
+    } catch (UnsupportedOperationException e) {
+      // if the correlation exists in an unsupported operator, retain the 
original plan.
+    }
+
+    HepProgram postProgram = HepProgram.builder()
+        .addRuleCollection(
+            ImmutableList.of(
+                CoreRules.FILTER_PROJECT_TRANSPOSE,
+                CoreRules.FILTER_INTO_JOIN,
+                CoreRules.MARK_TO_SEMI_OR_ANTI_JOIN_RULE,
+                CoreRules.PROJECT_MERGE,
+                CoreRules.PROJECT_REMOVE))
+        .build();
+    HepPlanner postPlanner = new HepPlanner(postProgram);
+    postPlanner.setRoot(decorrelateNode);
+    return postPlanner.findBestExp();
+  }
+
+  /**
+   * Eliminates Correlate.
+   *
+   * @param rel                           RelNode
+   * @param allowEmptyOutputFromRewrite   whether allow empty output resulting 
from
+   *                                      decorrelate rewriting.
+   * @return  Equivalent RelNode without Correlate
+   */
+  private RelNode correlateElimination(RelNode rel, boolean 
allowEmptyOutputFromRewrite) {
+    if (!(rel instanceof Correlate)) {
+      for (int i = 0; i < rel.getInputs().size(); i++) {
+        rel.replaceInput(i, correlateElimination(rel.getInput(i), 
allowEmptyOutputFromRewrite));
+      }
+      return rel;
+    }
+
+    final Correlate correlate = (Correlate) rel;
+    final RelNode newLeft;
+    if (hasParent) {
+      // if the current decorrelator has a parent, it means that the Correlate 
must have
+      // correlation from above.
+      assert hasCorrelatedExpressions.containsKey(correlate)
+          && hasCorrelatedExpressions.get(correlate);
+      newLeft = unnest(correlate.getLeft(), allowEmptyOutputFromRewrite);
+    } else {
+      // otherwise, start a new decorrelation for the left side.
+      newLeft = decorrelateQuery(correlate.getLeft(), builder);
+    }
+
+    // create or update UnnestedQuery of left side and corDefs of this 
decorrelator.
+    UnnestedQuery leftInfo = mapRelToUnnestedQuery.get(correlate.getLeft());
+    TreeMap<CorDef, Integer> corDefOutputs = new TreeMap<>();
+    Map<Integer, Integer> oldToNewOutputs = new HashMap<>();
+    for (int i = 0; i < correlate.getLeft().getRowType().getFieldCount(); i++) 
{
+      int newColumnIndex = leftInfo == null ? i : 
requireNonNull(leftInfo.oldToNewOutputs.get(i));
+      oldToNewOutputs.put(i, newColumnIndex);
+      if (correlate.getRequiredColumns().get(i)) {
+        CorDef corDef = new CorDef(correlate.getCorrelationId(), i);
+        corDefs.add(corDef);
+        corDefOutputs.put(corDef, newColumnIndex);
+      }
+    }
+    if (leftInfo != null) {
+      corDefOutputs.putAll(leftInfo.corDefOutputs);
+    }
+    leftInfo = new UnnestedQuery(correlate.getLeft(), newLeft, corDefOutputs, 
oldToNewOutputs);
+    dedupFreeVarsNode = DedupFreeVarsNode.create(newLeft, leftInfo, corDefs, 
builder);
+
+    // decorrelate right side
+    detectCorrelatedExpressions(correlate.getRight());
+    allowEmptyOutputFromRewrite &= correlate.getJoinType() != 
JoinRelType.LEFT_MARK;
+    RelNode newRight = unnest(correlate.getRight(), 
allowEmptyOutputFromRewrite);
+    UnnestedQuery rightInfo = 
requireNonNull(mapRelToUnnestedQuery.get(correlate.getRight()));
+
+    // rewrite condition, adding the natural join condition between the left 
side and
+    // the domain D that is produced from the right side. This the fundamental 
equation from the
+    // paper Improving Unnesting of Complex Queries, shown in Section 2.2
+    //
+    //         Correlate(condition=[p])
+    //           /   \
+    //          L     ... with correlation
+    //                  \
+    //                   R
+    // =>
+    //         Join(condition=[p AND (L is not distinct from D)])
+    //          /   \
+    //         L     ... without correlation
+    //                \
+    //                 x
+    //                / \
+    //               D   R
+    builder.push(newLeft).push(newRight);
+    RexNode unnestedJoinCondition =
+        UnnestedQuery.createUnnestedJoinCondition(correlate.getCondition(), 
leftInfo, rightInfo,
+            true, builder, corDefs);
+    RelNode unnestedRel = builder.join(correlate.getJoinType(), 
unnestedJoinCondition).build();
+
+    if (!hasParent) {
+      // ensure that the fields are in the same order as in the original plan.
+      builder.push(unnestedRel);
+      UnnestedQuery unnestedQuery =
+          UnnestedQuery.createJoinUnnestInfo(
+              leftInfo,
+              rightInfo,
+              correlate,
+              unnestedRel,
+              correlate.getJoinType());
+      List<RexNode> projects
+          = builder.fields(new 
ArrayList<>(unnestedQuery.oldToNewOutputs.values()));
+      unnestedRel = builder.project(projects).build();
+    }
+    return unnestedRel;
+  }
+
+  /**
+   * Detects whether any expression in the relational tree rooted at {@code 
rel} refers to any
+   * variables that appear in {@link #corDefs} and populates the {@link 
#hasCorrelatedExpressions}.
+   *
+   * <p> It is necessary to detect correlation for every node, for example:
+   *
+   * <blockquote><pre>
+   *      Union
+   *    /   |   \
+   *  r1    r2   r3     all with correlation
+   *  |     |     |
+   * r11   r22   r33    all without correlation
+   * </pre></blockquote>
+   *
+   * <p> If we stop after detecting correlation in the r1 branch, we lose 
correlation information
+   * for the r2 and r3 branches. Without that information we cannot know the 
correct stopping point
+   * when pushing down D to r2/r3 branches. In addition, accurately knowing 
the correlation of each
+   * input enables useful optimizations when pushing down D to Join.
+   *
+   * @param rel RelNode
+   * @return true when there are correlated expressions
+   */
+  private boolean detectCorrelatedExpressions(RelNode rel) {
+    if (!hasParent && hasCorrelatedExpressions.containsKey(rel)) {
+      // for shared sub-trees, check the map hasCorrelatedExpressions first. 
However, this is only
+      // valid when there is no parent decorrelator. For example:
+      //    Correlate0 => cor0
+      //   /        \
+      //  r1        r2
+      //             \
+      //          Correlate1 => cor1
+      //          /        \
+      //   r3 with cor0    r5 with cor1
+      //         /           \
+      //       r4            r6
+      // for the parent decorrelator-0 of Correlate0, r5 doesn't have 
correlation. However, for the
+      // decorrelator-1 of Correlate1, its construction merge information from 
the parent
+      // decorrelation-0, at this point, r5 still doesn't have correlation. 
Once the decorrelator-1
+      // completes the decorrelation on the left side of Correlate1, it need 
to detect the
+      // correlation on Correlate1 right side (based on cor0 and cor1). Now r5 
has correlation,
+      // and the value in hasCorrelatedExpressions will change from FALSE to 
TRUE.
+      return hasCorrelatedExpressions.get(rel);
+    }
+    boolean hasCorrelation = false;
+    for (RelNode input : rel.getInputs()) {
+      hasCorrelation |= detectCorrelatedExpressions(input);
+    }
+    if (!hasCorrelation) {
+      RelOptUtil.VariableUsedVisitor variableUsedVisitor =
+          new RelOptUtil.VariableUsedVisitor(null);
+      rel.accept(variableUsedVisitor);
+      Set<CorrelationId> corrIdSet
+          = corDefs.stream()
+          .map(corDef -> corDef.corr)
+          .collect(ImmutableSet.toImmutableSet());
+      hasCorrelation =
+          !variableUsedVisitor.variables.isEmpty()
+              && !Collections.disjoint(corrIdSet, 
variableUsedVisitor.variables);
+    }
+    hasCorrelatedExpressions.put(rel, hasCorrelation);
+    return hasCorrelation;
+  }
+
+  /**
+   * Unnests a RelNode. If there is no correlation in the node, create the 
cross product
+   * with domain D; otherwise, dispatch to specific method to push down D 
based on the type of rel.
+   *
+   * @param rel                           RelNode
+   * @param allowEmptyOutputFromRewrite   whether allow empty output resulting 
from
+   *                                      decorrelate rewriting.
+   * @return new node (contains domain D) without correlation
+   */
+  private RelNode unnest(RelNode rel, boolean allowEmptyOutputFromRewrite) {
+    if (!requireNonNull(hasCorrelatedExpressions.get(rel))) {
+      RelNode newRel
+          = builder.push(decorrelateQuery(rel, builder))
+              .push(dedupFreeVarsNode.r)
+              .join(JoinRelType.INNER)
+              .build();
+      Map<Integer, Integer> oldToNewOutputs = new HashMap<>();
+      IntStream.range(0, rel.getRowType().getFieldCount())
+          .forEach(i -> oldToNewOutputs.put(i, i));
+
+      int offset = rel.getRowType().getFieldCount();
+      TreeMap<CorDef, Integer> corDefOutputs = new TreeMap<>();
+      for (CorDef corDef : corDefs) {
+        corDefOutputs.put(corDef, offset++);
+      }
+
+      UnnestedQuery unnestedQuery
+          = new UnnestedQuery(rel, newRel, corDefOutputs, oldToNewOutputs);
+      mapRelToUnnestedQuery.put(rel, unnestedQuery);
+      return newRel;
+    }
+    return dispatcher.invoke(rel, allowEmptyOutputFromRewrite);
+  }
+
+  public RelNode unnestInternal(Filter filter, boolean 
allowEmptyOutputFromRewrite) {
+    Map<Integer, Integer> oldToNewOutputs = new HashMap<>();
+    TreeMap<CorDef, Integer> corDefOutputs = new TreeMap<>();
+    List<RexNode> newConditions = new ArrayList<>();
+    // try to replace all free variables to input refs according to 
equi-conditions
+    if (tryReplaceFreeVarsToInputRef(filter, corDefOutputs, newConditions)) {
+      // all free variables can be replaced, no need to push down D, that is, 
D is eliminated.
+      builder.push(filter.getInput()).filter(newConditions);
+      for (int i = 0; i < filter.getRowType().getFieldCount(); i++) {
+        oldToNewOutputs.put(i, i);
+      }
+    } else {
+      // push down D
+      RelNode newInput = unnest(filter.getInput(), 
allowEmptyOutputFromRewrite);
+      UnnestedQuery inputInfo = 
requireNonNull(mapRelToUnnestedQuery.get(filter.getInput()));
+      RexNode newCondition =
+          CorrelatedExprRewriter.rewrite(filter.getCondition(), inputInfo);
+      builder.push(newInput).filter(newCondition);
+      oldToNewOutputs = inputInfo.oldToNewOutputs;
+      corDefOutputs.putAll(inputInfo.corDefOutputs);
+    }
+    RelNode newFilter = builder.build();
+    UnnestedQuery unnestedQuery =
+        new UnnestedQuery(filter, newFilter, corDefOutputs, oldToNewOutputs);
+    mapRelToUnnestedQuery.put(filter, unnestedQuery);
+    return newFilter;
+  }
+
+  public RelNode unnestInternal(Project project, boolean 
allowEmptyOutputFromRewrite) {
+    for (RexNode expr : project.getProjects()) {
+      if (!allowEmptyOutputFromRewrite) {
+        break;
+      }
+      allowEmptyOutputFromRewrite &= Strong.isStrong(expr);
+    }
+    RelNode newInput = unnest(project.getInput(), allowEmptyOutputFromRewrite);
+    UnnestedQuery inputInfo = 
requireNonNull(mapRelToUnnestedQuery.get(project.getInput()));
+    List<RexNode> newProjects
+        = CorrelatedExprRewriter.rewrite(project.getProjects(), inputInfo);
+
+    int oriFieldCount = newProjects.size();
+    Map<Integer, Integer> oldToNewOutputs = new HashMap<>();
+    IntStream.range(0, oriFieldCount).forEach(i -> oldToNewOutputs.put(i, i));
+
+    builder.push(newInput);
+    TreeMap<CorDef, Integer> corDefOutputs = new TreeMap<>();
+    for (CorDef corDef : corDefs) {
+      
newProjects.add(builder.field(requireNonNull(inputInfo.corDefOutputs.get(corDef))));
+      corDefOutputs.put(corDef, oriFieldCount++);
+    }
+    RelNode newProject = builder.project(newProjects, ImmutableList.of(), 
true).build();
+    UnnestedQuery unnestedQuery
+        = new UnnestedQuery(project, newProject, corDefOutputs, 
oldToNewOutputs);
+    mapRelToUnnestedQuery.put(project, unnestedQuery);
+    return newProject;
+  }
+
+  public RelNode unnestInternal(Aggregate aggregate, boolean 
allowEmptyOutputFromRewrite) {
+    RelNode newInput = unnest(aggregate.getInput(), 
allowEmptyOutputFromRewrite);
+    UnnestedQuery inputUnnestedQuery =
+        requireNonNull(mapRelToUnnestedQuery.get(aggregate.getInput()));
+    builder.push(newInput);
+
+    // create new groupSet and groupSets, adding the fields in D to group keys
+    ImmutableBitSet.Builder corKeyBuilder = ImmutableBitSet.builder();
+    for (CorDef corDef : corDefs) {
+      int corKeyIndex = 
requireNonNull(inputUnnestedQuery.corDefOutputs.get(corDef));
+      corKeyBuilder.set(corKeyIndex);
+    }
+    ImmutableBitSet corKeyBitSet = corKeyBuilder.build();
+    ImmutableBitSet newGroupSet
+        = aggregate.getGroupSet().permute(inputUnnestedQuery.oldToNewOutputs)
+            .union(corKeyBitSet);
+    List<ImmutableBitSet> newGroupSets = new ArrayList<>();
+    for (ImmutableBitSet bitSet : aggregate.getGroupSets()) {
+      ImmutableBitSet newBitSet
+          = 
bitSet.permute(inputUnnestedQuery.oldToNewOutputs).union(corKeyBitSet);
+      newGroupSets.add(newBitSet);
+    }
+
+    // create new aggregate functions
+    boolean hasCountFunction = false;
+    List<AggregateCall> permutedAggCalls = new ArrayList<>();
+    Mappings.TargetMapping targetMapping =
+        Mappings.target(
+            inputUnnestedQuery.oldToNewOutputs,
+            inputUnnestedQuery.oldRel.getRowType().getFieldCount(),
+            inputUnnestedQuery.r.getRowType().getFieldCount());
+    for (AggregateCall aggCall : aggregate.getAggCallList()) {
+      hasCountFunction |= aggCall.getAggregation() instanceof 
SqlCountAggFunction;
+      permutedAggCalls.add(aggCall.transform(targetMapping));
+    }
+    // create new Aggregate node
+    RelNode newAggregate
+        = builder.aggregate(builder.groupKey(newGroupSet, newGroupSets), 
permutedAggCalls).build();
+
+    // create UnnestedQuery
+    Map<Integer, Integer> oldToNewOutputs = new HashMap<>();
+    for (int groupKey : aggregate.getGroupSet()) {
+      int oriIndex = aggregate.getGroupSet().indexOf(groupKey);
+      int newIndex = newGroupSet.indexOf(groupKey);
+      oldToNewOutputs.put(oriIndex, newIndex);
+    }
+    for (int i = 0; i < aggregate.getAggCallList().size(); i++) {
+      oldToNewOutputs.put(
+          aggregate.getGroupCount() + i,
+          newGroupSet.cardinality() + i);
+    }
+    TreeMap<CorDef, Integer> corDefOutputs = new TreeMap<>();
+    for (CorDef corDef : corDefs) {
+      int index = requireNonNull(inputUnnestedQuery.corDefOutputs.get(corDef));
+      corDefOutputs.put(corDef, newGroupSet.indexOf(index));
+    }
+
+    if (aggregate.hasEmptyGroup()
+        && (!allowEmptyOutputFromRewrite || hasCountFunction)) {
+      // create a left join with D to avoid rewriting from non-empty to empty 
output
+      builder.push(dedupFreeVarsNode.r).push(newAggregate);
+      List<RexNode> leftJoinConditions = new ArrayList<>();
+      int freeVarsIndex = 0;
+      for (CorDef corDef : corDefs) {
+        RexNode notDistinctFrom =
+            builder.isNotDistinctFrom(
+                builder.field(2, 0, freeVarsIndex),
+                builder.field(2, 1, 
requireNonNull(corDefOutputs.get(corDef))));
+        leftJoinConditions.add(notDistinctFrom);
+
+        corDefOutputs.put(corDef, freeVarsIndex++);
+      }
+      builder.join(JoinRelType.LEFT, leftJoinConditions);
+
+      // replace the reference to COUNT with CASE WHEN COUNT(*) IS NULL THEN 0 
ELSE COUNT(*) END
+      List<RexNode> aggCallProjects = new ArrayList<>();
+      final int aggCallStartIndex =
+          dedupFreeVarsNode.r.getRowType().getFieldCount() + 
newGroupSet.cardinality();
+      for (int i = 0; i < permutedAggCalls.size(); i++) {
+        int index = aggCallStartIndex + i;
+        SqlAggFunction aggregation = permutedAggCalls.get(i).getAggregation();
+        if (aggregation instanceof SqlCountAggFunction) {
+          RexNode caseWhenRewrite =
+              builder.call(
+                  SqlStdOperatorTable.CASE,
+                  builder.isNotNull(builder.field(index)),
+                  builder.field(index),
+                  builder.literal(0));
+          aggCallProjects.add(caseWhenRewrite);
+        } else {
+          aggCallProjects.add(builder.field(index));
+        }
+      }
+      List<RexNode> projects =
+          new ArrayList<>(builder.fields(ImmutableBitSet.range(0, 
aggCallStartIndex)));
+      projects.addAll(aggCallProjects);
+      newAggregate = builder.project(projects).build();
+
+
+      for (Map.Entry<Integer, Integer> entry : oldToNewOutputs.entrySet()) {
+        int value = requireNonNull(entry.getValue());
+        entry.setValue(value + corDefs.size());
+      }
+    }
+    UnnestedQuery unnestedQuery
+        = new UnnestedQuery(aggregate, newAggregate, corDefOutputs, 
oldToNewOutputs);
+    mapRelToUnnestedQuery.put(aggregate, unnestedQuery);
+    return newAggregate;
+  }
+
+  public RelNode unnestInternal(Sort sort, boolean 
allowEmptyOutputFromRewrite) {
+    RelNode newInput = unnest(sort.getInput(), allowEmptyOutputFromRewrite);
+    UnnestedQuery inputInfo =
+        requireNonNull(mapRelToUnnestedQuery.get(sort.getInput()));
+    Mappings.TargetMapping targetMapping =
+        Mappings.target(
+            inputInfo.oldToNewOutputs,
+            inputInfo.oldRel.getRowType().getFieldCount(),
+            inputInfo.r.getRowType().getFieldCount());
+    RelCollation shiftCollation = sort.getCollation().apply(targetMapping);
+    builder.push(newInput);
+
+    if (!sort.collation.getFieldCollations().isEmpty()
+        && (sort.offset != null || sort.fetch != null)) {
+      // the Sort with ORDER BY and LIMIT or OFFSET have to be changed during 
rewriting because
+      // now the limit has to be enforced per value of the outer bindings 
instead of globally.
+      // It can be rewritten using ROW_NUMBER() window function and filtering 
on it,
+      // see section 4.4 in paper Improving Unnesting of Complex Queries
+      List<RexNode> partitionKeys = new ArrayList<>();
+      for (CorDef corDef : corDefs) {
+        int partitionKeyIndex = 
requireNonNull(inputInfo.corDefOutputs.get(corDef));
+        partitionKeys.add(builder.field(partitionKeyIndex));
+      }
+      RexNode rowNumber = builder.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
+          .over()
+          .partitionBy(partitionKeys)
+          .orderBy(builder.fields(shiftCollation))
+          .toRex();
+      List<RexNode> projectsWithRowNumber = new ArrayList<>(builder.fields());
+      projectsWithRowNumber.add(rowNumber);
+      builder.project(projectsWithRowNumber);
+
+      List<RexNode> conditions = new ArrayList<>();
+      if (sort.offset != null) {
+        RexNode greaterThenLowerBound =
+            builder.call(
+                SqlStdOperatorTable.GREATER_THAN,
+                builder.field(projectsWithRowNumber.size() - 1),
+                sort.offset);
+        conditions.add(greaterThenLowerBound);
+      }
+      if (sort.fetch != null) {
+        RexNode upperBound = sort.offset == null
+            ? sort.fetch
+            : builder.call(SqlStdOperatorTable.PLUS, sort.offset, sort.fetch);
+        RexNode lessThenOrEqualUpperBound =
+            builder.call(
+                SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
+                builder.field(projectsWithRowNumber.size() - 1),
+                upperBound);
+        conditions.add(lessThenOrEqualUpperBound);
+      }
+      builder.filter(conditions);
+    } else {
+      builder.sortLimit(sort.offset, sort.fetch, 
builder.fields(shiftCollation));
+    }
+    RelNode newSort = builder.build();
+    UnnestedQuery unnestedQuery
+        = new UnnestedQuery(sort, newSort, inputInfo.corDefOutputs, 
inputInfo.oldToNewOutputs);
+    mapRelToUnnestedQuery.put(sort, unnestedQuery);
+    return newSort;
+  }
+
+  public RelNode unnestInternal(Correlate correlate, boolean 
allowEmptyOutputFromRewrite) {
+    // when nesting Correlate and there are still correlation, create a 
sub-decorrelator and merge
+    // this decorrelator's context to the sub-decorrelator.
+    TopDownGeneralDecorrelator subDecorrelator = createSubDecorrelator();
+    Join newJoin =
+        (Join) subDecorrelator.correlateElimination(correlate, 
allowEmptyOutputFromRewrite);
+
+    UnnestedQuery leftInfo
+        = 
requireNonNull(subDecorrelator.mapRelToUnnestedQuery.get(correlate.getLeft()));
+    UnnestedQuery rightInfo
+        = 
requireNonNull(subDecorrelator.mapRelToUnnestedQuery.get(correlate.getRight()));
+    UnnestedQuery unnestedQuery =
+        UnnestedQuery.createJoinUnnestInfo(leftInfo, rightInfo, correlate,
+            newJoin, correlate.getJoinType());
+    mapRelToUnnestedQuery.put(correlate, unnestedQuery);
+    return newJoin;
+  }
+
+  public RelNode unnestInternal(Join join, boolean 
allowEmptyOutputFromRewrite) {
+    boolean leftHasCorrelation =
+        requireNonNull(hasCorrelatedExpressions.get(join.getLeft()));
+    boolean rightHasCorrelation =
+        requireNonNull(hasCorrelatedExpressions.get(join.getRight()));
+    boolean pushDownToLeft = false;
+    boolean pushDownToRight = false;
+    RelNode newLeft;
+    RelNode newRight;
+    UnnestedQuery leftInfo;
+    UnnestedQuery rightInfo;
+
+    if (!leftHasCorrelation && !join.getJoinType().generatesNullsOnRight()
+        && join.getJoinType().projectsRight()) {
+      // there is no need to push down domain D to left side when both 
following conditions
+      // are satisfied:
+      // 1. there is no correlation on left side
+      // 2. join type will not generate NULL values on right side and will 
project right
+      // In this case, the left side will start a decorrelation independently
+      newLeft = decorrelateQuery(join.getLeft(), builder);
+      Map<Integer, Integer> leftOldToNewOutputs = new HashMap<>();
+      IntStream.range(0, newLeft.getRowType().getFieldCount())
+          .forEach(i -> leftOldToNewOutputs.put(i, i));
+      leftInfo = new UnnestedQuery(join.getLeft(), newLeft, new TreeMap<>(), 
leftOldToNewOutputs);
+    } else {
+      newLeft = unnest(join.getLeft(), allowEmptyOutputFromRewrite);
+      pushDownToLeft = true;
+      leftInfo = requireNonNull(mapRelToUnnestedQuery.get(join.getLeft()));
+    }
+    if (!rightHasCorrelation && !join.getJoinType().generatesNullsOnLeft()) {
+      // there is no need to push down domain D to right side when both 
following conditions
+      // are satisfied:
+      // 1. there is no correlation on right side
+      // 2. join type will not generate NULL values on left side
+      // In this case, the right side will start a decorrelation independently
+      newRight = decorrelateQuery(join.getRight(), builder);
+      Map<Integer, Integer> rightOldToNewOutputs = new HashMap<>();
+      IntStream.range(0, newRight.getRowType().getFieldCount())
+          .forEach(i -> rightOldToNewOutputs.put(i, i));
+      rightInfo =
+          new UnnestedQuery(join.getRight(), newRight, new TreeMap<>(), 
rightOldToNewOutputs);
+    } else {
+      allowEmptyOutputFromRewrite &= join.getJoinType() != 
JoinRelType.LEFT_MARK;
+      newRight = unnest(join.getRight(), allowEmptyOutputFromRewrite);
+      pushDownToRight = true;
+      rightInfo = requireNonNull(mapRelToUnnestedQuery.get(join.getRight()));
+    }
+
+    builder.push(newLeft).push(newRight);
+    // if domain D is pushed down to both sides, the new join condition need 
to add the natural
+    // condition between D
+    RexNode newJoinCondition =
+        UnnestedQuery.createUnnestedJoinCondition(
+            join.getCondition(),
+            leftInfo,
+            rightInfo,
+            pushDownToLeft && pushDownToRight,
+            builder,
+            corDefs);
+    RelNode newJoin = builder.join(join.getJoinType(), 
newJoinCondition).build();
+    UnnestedQuery unnestedQuery =
+        UnnestedQuery.createJoinUnnestInfo(
+            leftInfo,
+            rightInfo,
+            join,
+            newJoin,
+            join.getJoinType());
+    mapRelToUnnestedQuery.put(join, unnestedQuery);
+    return newJoin;
+  }
+
+  public RelNode unnestInternal(SetOp setOp, boolean 
allowEmptyOutputFromRewrite) {
+    List<RelNode> newInputs = new ArrayList<>();
+    for (RelNode input : setOp.getInputs()) {
+      // push down the domain D to each input
+      RelNode newInput = unnest(input, allowEmptyOutputFromRewrite);
+      builder.push(newInput);
+      UnnestedQuery inputInfo = 
requireNonNull(mapRelToUnnestedQuery.get(input));
+      // ensure that the rowType remains consistent after each input is 
rewritten:
+      // [original fields that maintain their original order, the domain D]
+      List<Integer> projectIndexes = new ArrayList<>();
+      for (int i = 0; i < inputInfo.oldRel.getRowType().getFieldCount(); i++) {
+        projectIndexes.add(requireNonNull(inputInfo.oldToNewOutputs.get(i)));
+      }
+      for (CorDef corDef : corDefs) {
+        
projectIndexes.add(requireNonNull(inputInfo.corDefOutputs.get(corDef)));
+      }
+      builder.project(builder.fields(projectIndexes));
+      newInputs.add(builder.build());
+    }
+    builder.pushAll(newInputs);
+    switch (setOp.kind) {
+    case UNION:
+      builder.union(setOp.all, newInputs.size());
+      break;
+    case INTERSECT:
+      builder.intersect(setOp.all, newInputs.size());
+      break;
+    case EXCEPT:
+      builder.minus(setOp.all, newInputs.size());
+      break;
+    default:
+      throw new AssertionError("Not a set op: " + setOp);
+    }
+    RelNode newSetOp = builder.build();
+
+    int oriSetOpFieldCount = setOp.getRowType().getFieldCount();
+    Map<Integer, Integer> oldToNewOutputs = new HashMap<>();
+    IntStream.range(0, oriSetOpFieldCount).forEach(i -> oldToNewOutputs.put(i, 
i));
+    TreeMap<CorDef, Integer> corDefOutputs = new TreeMap<>();
+    for (CorDef corDef : corDefs) {
+      corDefOutputs.put(corDef, oriSetOpFieldCount++);
+    }
+    UnnestedQuery unnestedQuery =
+        new UnnestedQuery(setOp, newSetOp, corDefOutputs, oldToNewOutputs);
+    mapRelToUnnestedQuery.put(setOp, unnestedQuery);
+    return newSetOp;
+  }
+
+  public RelNode unnestInternal(RelNode other) {
+    throw new UnsupportedOperationException("Top-down general decorrelator 
does not support: "
+        + other.getClass().getSimpleName());
+  }
+
+  /**
+   * Try to replace all free variables (i.e. the attributes of domain D) with 
RexInputRef.
+   * When the decorrelation process reaches:
+   *
+   * <pre>{@code
+   *              Filter    with correlation
+   *                |
+   *              Input     without correlation
+   * }</pre>
+   *
+   * <p> It will introduce the domain D by creating a cross product with 
Input. However, if all free
+   * variables in the condition are filtered using equality conditions with 
local attributes, then
+   * we can instead derive the domain from the local attributes. This 
substitution results in a
+   * superset (compared to creating a cross product between D and input), 
because the filter effect
+   * of the equality conditions is removed. However, this does not affect the 
final result,
+   * because the filter will still happen at a later stage (at the original 
Correlate). Although
+   * this substitution will result in more intermediate results, we assume 
that introducing a join
+   * is more costly. See section 3.3 in paper Unnesting Arbitrary Queries.
+   *
+   * @param filter              Filter node
+   * @param corDefToInputIndex  a map from CorDef to input index
+   * @param newConditions       new conditions after replacing free variables
+   * @return  true when all free variables are replaced
+   */
+  private boolean tryReplaceFreeVarsToInputRef(
+      Filter filter,
+      Map<CorDef, Integer> corDefToInputIndex,
+      List<RexNode> newConditions) {
+    if (requireNonNull(hasCorrelatedExpressions.get(filter.getInput()))) {
+      return false;
+    }
+    List<RexNode> oriConditions = 
RelOptUtil.conjunctions(filter.getCondition());
+    for (RexNode condition : oriConditions) {
+      if (RexUtil.containsCorrelation(condition)) {
+        Pair<CorDef, RexInputRef> pair = 
getPairOfFreeVarAndInputRefInEqui(condition);
+        if (pair != null) {
+          // equi-condition will filter NULL values, so need to add IS NOT 
NULL for input ref
+          if (condition.isA(SqlKind.EQUALS)) {
+            newConditions.add(builder.isNotNull(pair.right));
+          }
+          corDefToInputIndex.put(pair.left, pair.right.getIndex());
+          continue;
+        }
+        // if the condition is correlated but it's not an equi-condition 
between free variable
+        // and input ref, then cannot replaced.
+        return false;
+      } else {
+        newConditions.add(condition);
+      }
+    }
+    Set<CorDef> replacedCorDef = corDefToInputIndex.keySet();
+    // ensure all free variables can be replaced
+    return replacedCorDef.size() == corDefs.size() && 
corDefs.containsAll(replacedCorDef);
+  }
+
+  private @Nullable Pair<CorDef, RexInputRef> 
getPairOfFreeVarAndInputRefInEqui(RexNode condition) {
+    if (!condition.isA(SqlKind.EQUALS) && 
!condition.isA(SqlKind.IS_NOT_DISTINCT_FROM)) {
+      return null;
+    }
+    RexCall equiCond = (RexCall) condition;
+    RexNode left = equiCond.getOperands().get(0);
+    RexNode right = equiCond.getOperands().get(1);
+    CorDef leftCorDef = unwrapCorDef(left);
+    CorDef rightCorDef = unwrapCorDef(right);
+    if (left instanceof RexInputRef && rightCorDef != null) {
+      return Pair.of(rightCorDef, (RexInputRef) left);
+    }
+    if (right instanceof RexInputRef && leftCorDef != null) {
+      return Pair.of(leftCorDef, (RexInputRef) right);
+    }
+    return null;
+  }
+
+  private @Nullable CorDef unwrapCorDef(RexNode expr) {
+    if (expr instanceof RexFieldAccess) {
+      RexFieldAccess fieldAccess = (RexFieldAccess) expr;
+      if (fieldAccess.getReferenceExpr() instanceof RexCorrelVariable) {
+        RexCorrelVariable v = (RexCorrelVariable) 
fieldAccess.getReferenceExpr();
+        CorDef corDef = new CorDef(v.id, fieldAccess.getField().getIndex());
+        return corDefs.contains(corDef) ? corDef : null;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Rewrites correlated expressions, window function and shift input 
references.
+   */
+  static class CorrelatedExprRewriter extends RexShuttle {
+    final UnnestedQuery unnestedQuery;
+
+    CorrelatedExprRewriter(UnnestedQuery unnestedQuery) {
+      this.unnestedQuery = unnestedQuery;
+    }
+
+    static RexNode rewrite(
+        RexNode expr,
+        UnnestedQuery unnestedQuery) {
+      CorrelatedExprRewriter rewriter = new 
CorrelatedExprRewriter(unnestedQuery);
+      return expr.accept(rewriter);
+    }
+
+    static List<RexNode> rewrite(
+        List<RexNode> exprs,
+        UnnestedQuery unnestedQuery) {
+      CorrelatedExprRewriter rewriter = new 
CorrelatedExprRewriter(unnestedQuery);
+      return new ArrayList<>(rewriter.apply(exprs));
+    }
+
+    @Override public RexNode visitInputRef(RexInputRef inputRef) {
+      int newIndex = 
requireNonNull(unnestedQuery.oldToNewOutputs.get(inputRef.getIndex()));
+      if (newIndex == inputRef.getIndex()) {
+        return inputRef;
+      }
+      return new RexInputRef(newIndex, inputRef.getType());
+    }
+
+    @Override public RexNode visitFieldAccess(RexFieldAccess fieldAccess) {
+      if (fieldAccess.getReferenceExpr() instanceof RexCorrelVariable) {
+        RexCorrelVariable v =
+            (RexCorrelVariable) fieldAccess.getReferenceExpr();
+        CorDef corDef = new CorDef(v.id, fieldAccess.getField().getIndex());
+        int newIndex = requireNonNull(unnestedQuery.corDefOutputs.get(corDef));
+        return new RexInputRef(newIndex, fieldAccess.getType());
+      }
+      return super.visitFieldAccess(fieldAccess);
+    }
+
+    @Override public RexWindow visitWindow(RexWindow window) {
+      RexWindow shiftedWindow = super.visitWindow(window);
+      List<RexNode> newPartitionKeys = new 
ArrayList<>(shiftedWindow.partitionKeys);
+      for (Integer corIndex : unnestedQuery.corDefOutputs.values()) {
+        RexInputRef inputRef =
+            new RexInputRef(
+                corIndex,
+                
unnestedQuery.r.getRowType().getFieldList().get(corIndex).getType());
+        newPartitionKeys.add(inputRef);
+      }
+      return unnestedQuery.r.getCluster().getRexBuilder().makeWindow(
+          newPartitionKeys,
+          window.orderKeys,
+          window.getLowerBound(),
+          window.getUpperBound(),
+          window.isRows(),
+          window.getExclude());
+    }
+  }
+
+  public TopDownGeneralDecorrelator getVisitor() {
+    return this;
+  }
+
+  /**
+   * Unnesting information.
+   */
+  static class UnnestedQuery extends Frame {
+    final RelNode oldRel;
+
+    /**
+     * Creates a UnnestedQuery.
+     *
+     * @param oldRel          old node before unnesting
+     * @param r               new node after unnesting
+     * @param corDefOutputs   a sorted map from CorDef to output index in new 
node
+     * @param oldToNewOutputs a map from old node output index to new node 
output index
+     */
+    UnnestedQuery(RelNode oldRel, RelNode r, NavigableMap<CorDef, Integer> 
corDefOutputs,
+        Map<Integer, Integer> oldToNewOutputs) {
+      super(oldRel, r, corDefOutputs, oldToNewOutputs);
+      this.oldRel = oldRel;
+    }
+
+    /**
+     * Create UnnestedQuery for Join/Correlate after decorrelating.
+     *
+     * @param leftInfo          UnnestedQuery of the left side
+     * @param rightInfo         UnnestedQuery of the right side
+     * @param oriJoinNode       original Join/Correlate node
+     * @param unnestedJoinNode  new node after decorrelating
+     * @param joinRelType       join type of original Join/Correlate
+     * @return UnnestedQuery
+     */
+    private static UnnestedQuery createJoinUnnestInfo(
+        UnnestedQuery leftInfo,
+        UnnestedQuery rightInfo,
+        RelNode oriJoinNode,
+        RelNode unnestedJoinNode,
+        JoinRelType joinRelType) {
+      Map<Integer, Integer> oldToNewOutputs = new HashMap<>();
+      oldToNewOutputs.putAll(leftInfo.oldToNewOutputs);
+      int oriLeftFieldCount = leftInfo.oldRel.getRowType().getFieldCount();
+      int newLeftFieldCount = leftInfo.r.getRowType().getFieldCount();
+      switch (joinRelType) {
+      case SEMI:
+      case ANTI:
+        break;
+      case LEFT_MARK:
+        oldToNewOutputs.put(oriLeftFieldCount, newLeftFieldCount);
+        break;
+      default:
+        rightInfo.oldToNewOutputs.forEach((oriIndex, newIndex) ->
+            oldToNewOutputs.put(
+                requireNonNull(oriIndex, "oriIndex") + oriLeftFieldCount,
+                requireNonNull(newIndex, "newIndex") + newLeftFieldCount));
+        break;
+      }
+
+      TreeMap<CorDef, Integer> corDefOutputs = new TreeMap<>();
+      if (!leftInfo.corDefOutputs.isEmpty()) {
+        corDefOutputs.putAll(leftInfo.corDefOutputs);
+      } else if (!rightInfo.corDefOutputs.isEmpty()) {
+        Litmus.THROW.check(joinRelType.projectsRight(),
+            "If the joinType doesn't project right, its left side must have 
UnnestInfo.");
+        rightInfo.corDefOutputs.forEach((corDef, index) ->
+            corDefOutputs.put(corDef, index + newLeftFieldCount));
+      } else {
+        throw new IllegalArgumentException("The UnnestInfo for both sides of 
Join/Correlate that "
+            + "has correlation should not all be empty.");
+      }
+      return new UnnestedQuery(oriJoinNode, unnestedJoinNode, corDefOutputs, 
oldToNewOutputs);
+    }
+
+    /**
+     * Create the new join condition after decorrelating.
+     *
+     * @param oriCondition              original Correlate/Join condition
+     * @param leftInfo                  UnnestedQuery of the left side
+     * @param rightInfo                 UnnestedQuery of the right side
+     * @param needNaturalJoinCondition  whether need to add the natural join 
condition for domain D
+     * @param builder                   RelBuilder
+     * @param corDefs                   the CorDef in the current decorrelator 
context
+     * @return the new join condition
+     */
+    private static RexNode createUnnestedJoinCondition(
+        RexNode oriCondition,
+        UnnestedQuery leftInfo,
+        UnnestedQuery rightInfo,
+        boolean needNaturalJoinCondition,
+        RelBuilder builder,
+        NavigableSet<CorDef> corDefs) {
+      // create a temporary inner join and its UnnestedQuery to help rewrite 
the
+      // original condition by CorrelatedExprRewriter
+      Map<Integer, Integer> temporaryOldToNewOutputs = new HashMap<>();
+      int oriLeftFieldCount = leftInfo.oldRel.getRowType().getFieldCount();
+      int newLeftFieldCount = leftInfo.r.getRowType().getFieldCount();
+      temporaryOldToNewOutputs.putAll(leftInfo.oldToNewOutputs);
+      rightInfo.oldToNewOutputs.forEach((oriIndex, newIndex) ->
+          temporaryOldToNewOutputs.put(
+              requireNonNull(oriIndex, "oriIndex") + oriLeftFieldCount,
+              requireNonNull(newIndex, "newIndex") + newLeftFieldCount));
+
+      TreeMap<CorDef, Integer> temporaryCorDefOutputs = new TreeMap<>();
+      if (!leftInfo.corDefOutputs.isEmpty()) {
+        temporaryCorDefOutputs.putAll(leftInfo.corDefOutputs);
+      } else if (!rightInfo.corDefOutputs.isEmpty()) {
+        rightInfo.corDefOutputs.forEach((corDef, index) ->
+            temporaryCorDefOutputs.put(corDef, index + newLeftFieldCount));
+      } else {
+        throw new IllegalArgumentException("The UnnestInfo for both sides of 
Join/Correlate that "
+            + "has correlation should not all be empty.");
+      }
+      RelNode temporaryOldRel = 
builder.push(leftInfo.oldRel).push(rightInfo.oldRel)
+          .join(JoinRelType.INNER)
+          .build();
+      RelNode temporaryNewRel = builder.push(leftInfo.r).push(rightInfo.r)
+          .join(JoinRelType.INNER)
+          .build();
+      UnnestedQuery temporaryInfo =
+          new UnnestedQuery(temporaryOldRel, temporaryNewRel,
+              temporaryCorDefOutputs, temporaryOldToNewOutputs);
+      RexNode rewriteOriCondition = 
CorrelatedExprRewriter.rewrite(oriCondition, temporaryInfo);
+      List<RexNode> unnestedJoinConditions = new ArrayList<>();
+      unnestedJoinConditions.add(rewriteOriCondition);
+
+      if (needNaturalJoinCondition) {
+        for (CorDef corDef : corDefs) {
+          int leftIndex = requireNonNull(leftInfo.corDefOutputs.get(corDef));
+          RelDataType leftColumnType
+              = 
leftInfo.r.getRowType().getFieldList().get(leftIndex).getType();
+          int rightIndex = requireNonNull(rightInfo.corDefOutputs.get(corDef));
+          RelDataType rightColumnType
+              = 
rightInfo.r.getRowType().getFieldList().get(rightIndex).getType();
+          RexNode notDistinctFrom =
+              builder.isNotDistinctFrom(
+                  new RexInputRef(leftIndex, leftColumnType),
+                  new RexInputRef(rightIndex + newLeftFieldCount, 
rightColumnType));
+          unnestedJoinConditions.add(notDistinctFrom);
+        }
+      }
+      return RexUtil.composeConjunction(builder.getRexBuilder(), 
unnestedJoinConditions);
+    }
+
+  }
+
+  /**
+   * The domain of the free variables. It's duplicate free. Corresponds to a 
relation denoted
+   * by D in the paper.
+   */
+  static class DedupFreeVarsNode {
+    final RelNode r;
+
+    DedupFreeVarsNode(RelNode r) {
+      this.r = r;
+    }
+
+    /**
+     * Generate the domain of the free variables D.
+     *
+     * @param newLeft   the left side (without correlation) of Correlate
+     * @param leftInfo  the UnnestedQuery of the left side of Correlate
+     * @param corDefs   the CorDef in the current decorrelator context
+     * @param builder   RelBuilder
+     * @return  the domain of the free variables D
+     */
+    static DedupFreeVarsNode create(
+        RelNode newLeft,
+        UnnestedQuery leftInfo,
+        NavigableSet<CorDef> corDefs,
+        RelBuilder builder) {
+      List<Integer> columnIndexes = new ArrayList<>();
+      for (CorDef corDef : corDefs) {
+        int fieldIndex = requireNonNull(leftInfo.corDefOutputs.get(corDef));
+        columnIndexes.add(fieldIndex);
+      }
+      List<RexNode> inputRefs = builder.push(newLeft)
+          .fields(columnIndexes);
+      RelNode rel = builder.project(inputRefs).distinct().build();
+      return new DedupFreeVarsNode(rel);
+    }
+  }
+
+}
diff --git a/core/src/main/java/org/apache/calcite/tools/RelBuilder.java 
b/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
index 154cca84a3..8f9e117974 100644
--- a/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
+++ b/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
@@ -3329,6 +3329,8 @@ public RelBuilder join(JoinRelType joinType, RexNode 
condition,
         filter(condition.accept(new Shifter(left.rel, id, right.rel)));
         right = stack.pop();
         break;
+      case LEFT_MARK:
+        break;
       case INNER:
         // For INNER, we can defer.
         postCondition = condition;
@@ -3337,9 +3339,15 @@ public RelBuilder join(JoinRelType joinType, RexNode 
condition,
         throw new IllegalArgumentException("Correlated " + joinType + " join 
is not supported");
       }
       final ImmutableBitSet requiredColumns = 
RelOptUtil.correlationColumns(id, right.rel);
-      join =
-          struct.correlateFactory.createCorrelate(left.rel, right.rel, 
ImmutableList.of(), id,
-              requiredColumns, joinType);
+      if (joinType == JoinRelType.LEFT_MARK) {
+        join =
+            
struct.conditionalCorrelateFactory.createConditionalCorrelate(left.rel, 
right.rel,
+                ImmutableList.of(), id, requiredColumns, joinType, condition);
+      } else {
+        join =
+            struct.correlateFactory.createCorrelate(left.rel, right.rel, 
ImmutableList.of(), id,
+                requiredColumns, joinType);
+      }
     } else {
       RelNode join0 =
           struct.joinFactory.createJoin(left.rel, right.rel,
diff --git a/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java 
b/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
index ff3cb03c36..ac7a55984f 100644
--- a/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
+++ b/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
@@ -11699,4 +11699,267 @@ private void 
checkLoptOptimizeJoinRule(LoptOptimizeJoinRule rule) {
         })
         .check();
   }
+
+  /** Test case of
+   * <a 
href="https://issues.apache.org/jira/browse/CALCITE-7031";>[CALCITE-7031]
+   * Implement the general decorrelation algorithm (Neumann & Kemper)</a>. */
+  @Test void testTopDownGeneralDecorrelateForFilterExists() {
+    final String sql = "select empno from emp where "
+        + "exists(select * from dept where dept.deptno = emp.deptno)";
+
+    sql(sql)
+        .withRule(
+            CoreRules.FILTER_SUB_QUERY_TO_MARK_CORRELATE,
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE)
+        .withLateDecorrelate(true)
+        .withTopDownGeneralDecorrelate(true)
+        .check();
+  }
+
+  @Test void testTopDownGeneralDecorrelateForFilterSome() {
+    final String sql = "select empno from emp where "
+        + "empno > SOME(select empno from emp_b where emp.ename = 
emp_b.ename)";
+
+    sql(sql)
+        .withRule(
+            CoreRules.FILTER_SUB_QUERY_TO_MARK_CORRELATE,
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE)
+        .withLateDecorrelate(true)
+        .withTopDownGeneralDecorrelate(true)
+        .check();
+  }
+
+  @Test void testTopDownGeneralDecorrelateForFilterNotIn() {
+    final String sql = "select empno from emp where "
+            + "empno not in (select empno from emp_b where emp.ename = 
emp_b.ename)";
+
+    sql(sql)
+        .withRule(
+            CoreRules.FILTER_SUB_QUERY_TO_MARK_CORRELATE,
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE)
+        .withLateDecorrelate(true)
+        .withTopDownGeneralDecorrelate(true)
+        .check();
+  }
+
+  @Test void testTopDownGeneralDecorrelateForFilterNotExists() {
+    final String sql = "select empno from emp where "
+            + "not exists(select * from emp_b where emp.ename = emp_b.ename)";
+
+    sql(sql)
+        .withRule(
+            CoreRules.FILTER_SUB_QUERY_TO_MARK_CORRELATE,
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE)
+        .withLateDecorrelate(true)
+        .withTopDownGeneralDecorrelate(true)
+        .check();
+  }
+
+  @Test void testTopDownGeneralDecorrelateForFilterScalar() {
+    final String sql = "select empno from emp where "
+        + "sal > (select avg(sal) from emp_b where emp.ename = emp_b.ename)";
+
+    sql(sql)
+        .withRule(
+            CoreRules.FILTER_SUB_QUERY_TO_MARK_CORRELATE,
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE)
+        .withLateDecorrelate(true)
+        .withTopDownGeneralDecorrelate(true)
+        .check();
+  }
+
+  @Test void testTopDownGeneralDecorrelateForSubqueryWithSetOp() {
+    final String sql = "select empno, (select sum(deptno) from ("
+        + "select deptno from emp_b where emp.empno = emp_b.empno "
+        + "union all select deptno from empnullables where emp.empno = 
empnullables.empno))"
+        + " from emp";
+
+    sql(sql)
+        .withRule(
+            CoreRules.PROJECT_SUB_QUERY_TO_MARK_CORRELATE,
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE)
+        .withLateDecorrelate(true)
+        .withTopDownGeneralDecorrelate(true)
+        .check();
+  }
+
+  @Test void testTopDownGeneralDecorrelateForSubqueryWithJoin() {
+    final String sql = "select empno from emp where sal > SOME("
+        + "select sal from empnullables, (select empno from emp_b where 
emp.deptno = emp_b.deptno)"
+        + " b where empnullables.empno = b.empno)";
+
+    sql(sql)
+        .withRule(
+            CoreRules.FILTER_SUB_QUERY_TO_MARK_CORRELATE,
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE)
+        .withLateDecorrelate(true)
+        .withTopDownGeneralDecorrelate(true)
+        .check();
+  }
+
+  @Test void testTopDownGeneralDecorrelateForCountScalar() {
+    final String sql = "select deptno, "
+        + "(select count(empno) from emp where dept.deptno = emp.deptno) from 
dept";
+
+    sql(sql)
+        .withRule(
+            CoreRules.PROJECT_SUB_QUERY_TO_MARK_CORRELATE,
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE)
+        .withLateDecorrelate(true)
+        .withTopDownGeneralDecorrelate(true)
+        .check();
+  }
+
+  @Test void testTopDownGeneralDecorrelateForProjectScalar() {
+    final String sql = "SELECT empno, sal + "
+        + "(SELECT avg(sal) FROM empdefaults where emp.deptno = 
empdefaults.deptno) "
+        + "FROM emp";
+
+    sql(sql)
+        .withRule(
+            CoreRules.PROJECT_SUB_QUERY_TO_MARK_CORRELATE,
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE)
+        .withLateDecorrelate(true)
+        .withTopDownGeneralDecorrelate(true)
+        .check();
+  }
+
+  @Test void testTopDownGeneralDecorrelateForProjectExists() {
+    final String sql = "SELECT dept.deptno, EXISTS ( SELECT 1 FROM emp e "
+        + "WHERE e.deptno = dept.deptno ) AS has_employees FROM dept";
+
+    sql(sql)
+        .withRule(
+            CoreRules.PROJECT_SUB_QUERY_TO_MARK_CORRELATE,
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE)
+        .withLateDecorrelate(true)
+        .withTopDownGeneralDecorrelate(true)
+        .check();
+  }
+
+  @Test void testTopDownGeneralDecorrelateForProjectIn() {
+    final String sql = "SELECT emp.deptno, emp.deptno IN (SELECT dept.deptno 
FROM dept "
+        + "where dept.deptno < emp.empno ) FROM emp";
+
+    sql(sql)
+        .withRule(
+            CoreRules.PROJECT_SUB_QUERY_TO_MARK_CORRELATE,
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE)
+        .withLateDecorrelate(true)
+        .withTopDownGeneralDecorrelate(true)
+        .check();
+  }
+
+  @Test void testTopDownGeneralDecorrelateForLateralJoin() {
+    final String sql = "select empno from emp,\n"
+        + " LATERAL (select * from dept where emp.deptno = dept.deptno)";
+
+    sql(sql)
+        .withRule(
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE)
+        .withLateDecorrelate(true)
+        .withTopDownGeneralDecorrelate(true)
+        .check();
+  }
+
+  @Test void testTopDownGeneralDecorrelateForTwoLevelCorrelate() {
+    final String sql = "select empno from emp where "
+        + "exists(select * from emp_b where emp.ename = emp_b.ename and "
+        + "exists(select * from empnullables where emp.empno = 
empnullables.empno and "
+        + "emp_b.deptno = empnullables.deptno))";
+
+    sql(sql)
+        .withRule(
+            CoreRules.FILTER_SUB_QUERY_TO_MARK_CORRELATE,
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE)
+        .withLateDecorrelate(true)
+        .withTopDownGeneralDecorrelate(true)
+        .check();
+  }
+
+  @Test void testTopDownGeneralDecorrelateForTwoLevelCorrelate2() {
+    final String sql = "select empno from emp where "
+        + "exists(select * from emp_b where emp.ename = emp_b.ename and "
+        + "exists(select * from empnullables where emp.empno = 
empnullables.empno))";
+
+    sql(sql)
+        .withRule(
+            CoreRules.FILTER_SUB_QUERY_TO_MARK_CORRELATE,
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE)
+        .withLateDecorrelate(true)
+        .withTopDownGeneralDecorrelate(true)
+        .check();
+  }
+
+  @Test void testTopDownGeneralDecorrelateForTwoLevelCorrelate3() {
+    final String sql = "SELECT deptno FROM emp e WHERE EXISTS (SELECT * FROM 
dept d WHERE EXISTS "
+        + "(SELECT * FROM bonus ea WHERE ea.ENAME = e.ENAME AND d.deptno = 
e.deptno))";
+
+    sql(sql)
+        .withRule(
+            CoreRules.FILTER_SUB_QUERY_TO_MARK_CORRELATE,
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE)
+        .withLateDecorrelate(true)
+        .withTopDownGeneralDecorrelate(true)
+        .check();
+  }
+
+  @Test void testTopDownGeneralDecorrelateForCannotRemoveD() {
+    final String sql = "select empno from emp where "
+        + "exists(select * from empnullables where emp.deptno > 
empnullables.deptno)";
+
+    sql(sql)
+        .withRule(
+            CoreRules.FILTER_SUB_QUERY_TO_MARK_CORRELATE,
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE)
+        .withLateDecorrelate(true)
+        .withTopDownGeneralDecorrelate(true)
+        .check();
+  }
+
+  @Test void testTopDownGeneralDecorrelateForSubqueryWithSort() {
+    final String sql = "select empno from emp where "
+        + "sal > SOME(select sal from emp_b where emp.deptno = emp_b.deptno "
+        + "order by emp_b.sal limit 5)";
+
+    sql(sql)
+        .withRule(
+            CoreRules.FILTER_SUB_QUERY_TO_MARK_CORRELATE,
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE)
+        .withLateDecorrelate(true)
+        .withTopDownGeneralDecorrelate(true)
+        .check();
+  }
+
+  @Test void testTopDownGeneralDecorrelateForSubqueryWithCube() {
+    final String sql = "select empno from emp where "
+        + "sal < SOME(select avg(sal) from emp_b where emp.job = emp_b.job 
group by cube(deptno))";
+
+    sql(sql)
+        .withRule(
+            CoreRules.FILTER_SUB_QUERY_TO_MARK_CORRELATE,
+            CoreRules.PROJECT_MERGE,
+            CoreRules.PROJECT_REMOVE)
+        .withLateDecorrelate(true)
+        .withTopDownGeneralDecorrelate(true)
+        .check();
+  }
+
 }
diff --git 
a/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml 
b/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
index e1db15d0ce..73f362b3f0 100644
--- a/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
@@ -19587,6 +19587,734 @@ LogicalProject(EMPNO=[$0])
   LogicalJoin(condition=[=($7, $8)], joinType=[semi])
     LogicalTableScan(table=[[scott, EMP]])
     LogicalTableScan(table=[[scott, DEPT]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTopDownGeneralDecorrelateForCannotRemoveD">
+    <Resource name="sql">
+      <![CDATA[select empno from emp where exists(select * from empnullables 
where emp.deptno > empnullables.deptno)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[>($cor0.DEPTNO, $7)])
+  LogicalTableScan(table=[[CATALOG, SALES, EMPNULLABLES]])
+})], variablesSet=[[$cor0]])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planMid">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[$9])
+    LogicalConditionalCorrelate(correlation=[$cor0], joinType=[left_mark], 
requiredColumns=[{7}])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalFilter(condition=[>($cor0.DEPTNO, $7)])
+        LogicalTableScan(table=[[CATALOG, SALES, EMPNULLABLES]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalJoin(condition=[IS NOT DISTINCT FROM($7, $18)], joinType=[semi])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalJoin(condition=[>($9, $7)], joinType=[inner])
+      LogicalTableScan(table=[[CATALOG, SALES, EMPNULLABLES]])
+      LogicalAggregate(group=[{0}])
+        LogicalProject(DEPTNO=[$7])
+          LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTopDownGeneralDecorrelateForCountScalar">
+    <Resource name="sql">
+      <![CDATA[select deptno, (select count(empno) from emp where dept.deptno 
= emp.deptno) from dept]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(variablesSet=[[$cor0]], DEPTNO=[$0], EXPR$1=[$SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject(EMPNO=[$0])
+    LogicalFilter(condition=[=($cor0.DEPTNO, $7)])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+})])
+  LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+    </Resource>
+    <Resource name="planMid">
+      <![CDATA[
+LogicalProject(DEPTNO=[$0], EXPR$1=[$2])
+  LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0}])
+    LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+    LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+      LogicalProject(EMPNO=[$0])
+        LogicalFilter(condition=[=($cor0.DEPTNO, $7)])
+          LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(DEPTNO=[$0], EXPR$1=[$4])
+  LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $2)], joinType=[left])
+    LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+    LogicalProject(DEPTNO=[$0], DEPTNO0=[$1], $f2=[CASE(IS NOT NULL($2), $2, 
0)])
+      LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[left])
+        LogicalProject(DEPTNO=[$0])
+          LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+        LogicalAggregate(group=[{0}], EXPR$0=[COUNT()])
+          LogicalProject(DEPTNO=[$7])
+            LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTopDownGeneralDecorrelateForFilterExists">
+    <Resource name="sql">
+      <![CDATA[select empno from emp where exists(select * from dept where 
dept.deptno = emp.deptno)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[=($0, $cor0.DEPTNO)])
+  LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+})], variablesSet=[[$cor0]])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planMid">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[$9])
+    LogicalConditionalCorrelate(correlation=[$cor0], joinType=[left_mark], 
requiredColumns=[{7}])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalFilter(condition=[=($0, $cor0.DEPTNO)])
+        LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalJoin(condition=[IS NOT DISTINCT FROM($7, $9)], joinType=[semi])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTopDownGeneralDecorrelateForFilterNotExists">
+    <Resource name="sql">
+      <![CDATA[select empno from emp where not exists(select * from emp_b 
where emp.ename = emp_b.ename)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[NOT(EXISTS({
+LogicalFilter(condition=[=($cor0.ENAME, $1)])
+  LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+}))], variablesSet=[[$cor0]])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planMid">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[NOT($9)])
+    LogicalConditionalCorrelate(correlation=[$cor0], joinType=[left_mark], 
requiredColumns=[{1}])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalFilter(condition=[=($cor0.ENAME, $1)])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalJoin(condition=[IS NOT DISTINCT FROM($1, $10)], joinType=[anti])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTopDownGeneralDecorrelateForFilterNotIn">
+    <Resource name="sql">
+      <![CDATA[select empno from emp where empno not in (select empno from 
emp_b where emp.ename = emp_b.ename)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[NOT(IN($0, {
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[=($cor0.ENAME, $1)])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+}))], variablesSet=[[$cor0]])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planMid">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[NOT($9)])
+    LogicalConditionalCorrelate(correlation=[$cor0], joinType=[left_mark], 
requiredColumns=[{1}], condition=[=($0, $9)])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalProject(EMPNO=[$0])
+        LogicalFilter(condition=[=($cor0.ENAME, $1)])
+          LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[NOT($9)])
+    LogicalJoin(condition=[AND(=($0, $9), IS NOT DISTINCT FROM($1, $10))], 
joinType=[left_mark])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalProject(EMPNO=[$0], ENAME=[$1])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTopDownGeneralDecorrelateForFilterScalar">
+    <Resource name="sql">
+      <![CDATA[select empno from emp where sal > (select avg(sal) from emp_b 
where emp.ename = emp_b.ename)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[>($5, $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[AVG($0)])
+  LogicalProject(SAL=[$5])
+    LogicalFilter(condition=[=($cor0.ENAME, $1)])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+}))], variablesSet=[[$cor0]])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planMid">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[>($5, $9)])
+    LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{1}])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalAggregate(group=[{}], EXPR$0=[AVG($0)])
+        LogicalProject(SAL=[$5])
+          LogicalFilter(condition=[=($cor0.ENAME, $1)])
+            LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalJoin(condition=[AND(IS NOT DISTINCT FROM($1, $9), >($5, $10))], 
joinType=[inner])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalAggregate(group=[{1}], EXPR$0=[AVG($0)])
+      LogicalProject(SAL=[$5], ENAME=[$1])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTopDownGeneralDecorrelateForFilterSome">
+    <Resource name="sql">
+      <![CDATA[select empno from emp where empno > SOME(select empno from 
emp_b where emp.ename = emp_b.ename)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[> SOME($0, {
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[=($cor0.ENAME, $1)])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+})], variablesSet=[[$cor0]])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planMid">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[$9])
+    LogicalConditionalCorrelate(correlation=[$cor0], joinType=[left_mark], 
requiredColumns=[{1}], condition=[>($0, $9)])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalProject(EMPNO=[$0])
+        LogicalFilter(condition=[=($cor0.ENAME, $1)])
+          LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalJoin(condition=[AND(>($0, $9), IS NOT DISTINCT FROM($1, $10))], 
joinType=[semi])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalProject(EMPNO=[$0], ENAME=[$1])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTopDownGeneralDecorrelateForLateralJoin">
+    <Resource name="sql">
+      <![CDATA[select empno from emp,
+ LATERAL (select * from dept where emp.deptno = dept.deptno)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{7}])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalProject(DEPTNO=[$0], NAME=[$1])
+      LogicalFilter(condition=[=($cor0.DEPTNO, $0)])
+        LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+    </Resource>
+    <Resource name="planMid">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{7}])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalFilter(condition=[=($cor0.DEPTNO, $0)])
+      LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalJoin(condition=[IS NOT DISTINCT FROM($7, $9)], joinType=[inner])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTopDownGeneralDecorrelateForProjectExists">
+    <Resource name="sql">
+      <![CDATA[SELECT dept.deptno, EXISTS ( SELECT 1 FROM emp e WHERE e.deptno 
= dept.deptno ) AS has_employees FROM dept]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(variablesSet=[[$cor0]], DEPTNO=[$0], HAS_EMPLOYEES=[EXISTS({
+LogicalFilter(condition=[=($7, $cor0.DEPTNO)])
+  LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+})])
+  LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+    </Resource>
+    <Resource name="planMid">
+      <![CDATA[
+LogicalProject(DEPTNO=[$0], HAS_EMPLOYEES=[$2])
+  LogicalConditionalCorrelate(correlation=[$cor0], joinType=[left_mark], 
requiredColumns=[{0}])
+    LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+    LogicalFilter(condition=[=($7, $cor0.DEPTNO)])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(DEPTNO=[$0], HAS_EMPLOYEES=[$2])
+  LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $9)], joinType=[left_mark])
+    LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTopDownGeneralDecorrelateForProjectIn">
+    <Resource name="sql">
+      <![CDATA[SELECT emp.deptno, emp.deptno IN (SELECT dept.deptno FROM dept 
where dept.deptno < emp.empno ) FROM emp]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(variablesSet=[[$cor0]], DEPTNO=[$7], EXPR$1=[IN($7, {
+LogicalProject(DEPTNO=[$0])
+  LogicalFilter(condition=[<($0, $cor0.EMPNO)])
+    LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+})])
+  LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planMid">
+      <![CDATA[
+LogicalProject(DEPTNO=[$7], EXPR$1=[$9])
+  LogicalConditionalCorrelate(correlation=[$cor0], joinType=[left_mark], 
requiredColumns=[{0}], condition=[=($7, $9)])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalProject(DEPTNO=[$0])
+      LogicalFilter(condition=[<($0, $cor0.EMPNO)])
+        LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(DEPTNO=[$7], EXPR$1=[$9])
+  LogicalJoin(condition=[AND(=($7, $9), IS NOT DISTINCT FROM($0, $10))], 
joinType=[left_mark])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalProject(DEPTNO=[$0], EMPNO=[$2])
+      LogicalJoin(condition=[<($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+        LogicalProject(EMPNO=[$0])
+          LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTopDownGeneralDecorrelateForProjectScalar">
+    <Resource name="sql">
+      <![CDATA[SELECT empno, sal + (SELECT avg(sal) FROM empdefaults where 
emp.deptno = empdefaults.deptno) FROM emp]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(variablesSet=[[$cor0]], EMPNO=[$0], EXPR$1=[+($5, 
$SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[AVG($0)])
+  LogicalProject(SAL=[$5])
+    LogicalFilter(condition=[=($cor0.DEPTNO, $7)])
+      LogicalTableScan(table=[[CATALOG, SALES, EMPDEFAULTS]])
+}))])
+  LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planMid">
+      <![CDATA[
+LogicalProject(EMPNO=[$0], EXPR$1=[+($5, $9)])
+  LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{7}])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalAggregate(group=[{}], EXPR$0=[AVG($0)])
+      LogicalProject(SAL=[$5])
+        LogicalFilter(condition=[=($cor0.DEPTNO, $7)])
+          LogicalTableScan(table=[[CATALOG, SALES, EMPDEFAULTS]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(EMPNO=[$0], EXPR$1=[+($5, $10)])
+  LogicalJoin(condition=[IS NOT DISTINCT FROM($7, $9)], joinType=[left])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalAggregate(group=[{1}], EXPR$0=[AVG($0)])
+      LogicalProject(SAL=[$5], DEPTNO=[$7])
+        LogicalFilter(condition=[IS NOT NULL($7)])
+          LogicalTableScan(table=[[CATALOG, SALES, EMPDEFAULTS]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTopDownGeneralDecorrelateForSubqueryWithCube">
+    <Resource name="sql">
+      <![CDATA[select empno from emp where sal < SOME(select avg(sal) from 
emp_b where emp.job = emp_b.job group by cube(deptno))]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[< SOME($5, {
+LogicalProject(EXPR$0=[$1])
+  LogicalAggregate(group=[{0}], groups=[[{0}, {}]], EXPR$0=[AVG($1)])
+    LogicalProject(DEPTNO=[$7], SAL=[$5])
+      LogicalFilter(condition=[=($cor0.JOB, $2)])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+})], variablesSet=[[$cor0]])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planMid">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[$9])
+    LogicalConditionalCorrelate(correlation=[$cor0], joinType=[left_mark], 
requiredColumns=[{2}], condition=[<($5, $9)])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalProject(EXPR$0=[$1])
+        LogicalAggregate(group=[{0}], groups=[[{0}, {}]], EXPR$0=[AVG($1)])
+          LogicalProject(DEPTNO=[$7], SAL=[$5])
+            LogicalFilter(condition=[=($cor0.JOB, $2)])
+              LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalJoin(condition=[AND(<($5, $9), IS NOT DISTINCT FROM($2, $10))], 
joinType=[semi])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalProject(EXPR$0=[$3], JOB=[$0])
+      LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $2)], joinType=[left])
+        LogicalAggregate(group=[{0}])
+          LogicalProject(JOB=[$2])
+            LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+        LogicalAggregate(group=[{0, 2}], groups=[[{0, 2}, {2}]], 
EXPR$0=[AVG($1)])
+          LogicalProject(DEPTNO=[$7], SAL=[$5], JOB=[$2])
+            LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTopDownGeneralDecorrelateForSubqueryWithJoin">
+    <Resource name="sql">
+      <![CDATA[select empno from emp where sal > SOME(select sal from 
empnullables, (select empno from emp_b where emp.deptno = emp_b.deptno) b where 
empnullables.empno = b.empno)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[> SOME($5, {
+LogicalProject(SAL=[$5])
+  LogicalFilter(condition=[=($0, $9)])
+    LogicalJoin(condition=[true], joinType=[inner])
+      LogicalTableScan(table=[[CATALOG, SALES, EMPNULLABLES]])
+      LogicalProject(EMPNO=[$0])
+        LogicalFilter(condition=[=($cor0.DEPTNO, $7)])
+          LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+})], variablesSet=[[$cor0]])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planMid">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[$9])
+    LogicalConditionalCorrelate(correlation=[$cor0], joinType=[left_mark], 
requiredColumns=[{7}], condition=[>($5, $9)])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalProject(SAL=[$5])
+        LogicalFilter(condition=[=($0, $9)])
+          LogicalJoin(condition=[true], joinType=[inner])
+            LogicalTableScan(table=[[CATALOG, SALES, EMPNULLABLES]])
+            LogicalProject(EMPNO=[$0])
+              LogicalFilter(condition=[=($cor0.DEPTNO, $7)])
+                LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalJoin(condition=[AND(>($5, $9), IS NOT DISTINCT FROM($7, $10))], 
joinType=[semi])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalProject(SAL=[$5], DEPTNO0=[$10])
+      LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+        LogicalTableScan(table=[[CATALOG, SALES, EMPNULLABLES]])
+        LogicalProject(EMPNO=[$0], DEPTNO=[$7])
+          LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTopDownGeneralDecorrelateForSubqueryWithSetOp">
+    <Resource name="sql">
+      <![CDATA[select empno, (select sum(deptno) from (select deptno from 
emp_b where emp.empno = emp_b.empno union all select deptno from empnullables 
where emp.empno = empnullables.empno)) from emp]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(variablesSet=[[$cor0]], EMPNO=[$0], EXPR$1=[$SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[SUM($0)])
+  LogicalUnion(all=[true])
+    LogicalProject(DEPTNO=[$7])
+      LogicalFilter(condition=[=($cor0.EMPNO, $0)])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+    LogicalProject(DEPTNO=[$7])
+      LogicalFilter(condition=[=($cor0.EMPNO, $0)])
+        LogicalTableScan(table=[[CATALOG, SALES, EMPNULLABLES]])
+})])
+  LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planMid">
+      <![CDATA[
+LogicalProject(EMPNO=[$0], EXPR$1=[$9])
+  LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0}])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalAggregate(group=[{}], EXPR$0=[SUM($0)])
+      LogicalUnion(all=[true])
+        LogicalProject(DEPTNO=[$7])
+          LogicalFilter(condition=[=($cor0.EMPNO, $0)])
+            LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+        LogicalProject(DEPTNO=[$7])
+          LogicalFilter(condition=[=($cor0.EMPNO, $0)])
+            LogicalTableScan(table=[[CATALOG, SALES, EMPNULLABLES]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(EMPNO=[$0], EXPR$1=[$10])
+  LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $9)], joinType=[left])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalAggregate(group=[{1}], EXPR$0=[SUM($0)])
+      LogicalUnion(all=[true])
+        LogicalProject(DEPTNO=[$7], EMPNO=[$0])
+          LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+        LogicalProject(DEPTNO=[$7], EMPNO=[$0])
+          LogicalTableScan(table=[[CATALOG, SALES, EMPNULLABLES]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTopDownGeneralDecorrelateForSubqueryWithSort">
+    <Resource name="sql">
+      <![CDATA[select empno from emp where sal > SOME(select sal from emp_b 
where emp.deptno = emp_b.deptno order by emp_b.sal limit 5)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[> SOME($5, {
+LogicalSort(sort0=[$0], dir0=[ASC], fetch=[5])
+  LogicalProject(SAL=[$5])
+    LogicalFilter(condition=[=($cor0.DEPTNO, $7)])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+})], variablesSet=[[$cor0]])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planMid">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[$9])
+    LogicalConditionalCorrelate(correlation=[$cor0], joinType=[left_mark], 
requiredColumns=[{7}], condition=[>($5, $9)])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalSort(sort0=[$0], dir0=[ASC], fetch=[5])
+        LogicalProject(SAL=[$5])
+          LogicalFilter(condition=[=($cor0.DEPTNO, $7)])
+            LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalJoin(condition=[AND(>($5, $9), IS NOT DISTINCT FROM($7, $10))], 
joinType=[semi])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalFilter(condition=[<=($2, 5)])
+      LogicalProject(SAL=[$5], DEPTNO=[$7], $f2=[ROW_NUMBER() OVER (PARTITION 
BY $7 ORDER BY $5 NULLS LAST)])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTopDownGeneralDecorrelateForTwoLevelCorrelate">
+    <Resource name="sql">
+      <![CDATA[select empno from emp where exists(select * from emp_b where 
emp.ename = emp_b.ename and exists(select * from empnullables where emp.empno = 
empnullables.empno and emp_b.deptno = empnullables.deptno))]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[AND(=($cor0.ENAME, $1), EXISTS({
+LogicalFilter(condition=[AND(=($cor0.EMPNO, $0), =($cor2.DEPTNO, $7))])
+  LogicalTableScan(table=[[CATALOG, SALES, EMPNULLABLES]])
+}))], variablesSet=[[$cor2]])
+  LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+})], variablesSet=[[$cor0]])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planMid">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[$9])
+    LogicalConditionalCorrelate(correlation=[$cor0], joinType=[left_mark], 
requiredColumns=[{0, 1}])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], 
HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8], BIRTHDATE=[$9])
+        LogicalFilter(condition=[AND(=($cor0.ENAME, $1), $10)])
+          LogicalConditionalCorrelate(correlation=[$cor2], 
joinType=[left_mark], requiredColumns=[{7}])
+            LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+            LogicalFilter(condition=[AND(=($cor0.EMPNO, $0), =($cor2.DEPTNO, 
$7))])
+              LogicalTableScan(table=[[CATALOG, SALES, EMPNULLABLES]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $19), IS NOT DISTINCT 
FROM($1, $20))], joinType=[semi])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalJoin(condition=[AND(IS NOT DISTINCT FROM($10, $21), IS NOT DISTINCT 
FROM($11, $22), IS NOT DISTINCT FROM($7, $23))], joinType=[semi])
+      LogicalJoin(condition=[=($11, $1)], joinType=[inner])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+        LogicalProject(EMPNO=[$0], ENAME=[$1])
+          LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalJoin(condition=[AND(=($9, $0), =($11, $7))], joinType=[inner])
+        LogicalTableScan(table=[[CATALOG, SALES, EMPNULLABLES]])
+        LogicalAggregate(group=[{0, 1, 2}])
+          LogicalProject(EMPNO0=[$10], ENAME0=[$11], DEPTNO=[$7])
+            LogicalJoin(condition=[true], joinType=[inner])
+              LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+              LogicalProject(EMPNO=[$0], ENAME=[$1])
+                LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTopDownGeneralDecorrelateForTwoLevelCorrelate2">
+    <Resource name="sql">
+      <![CDATA[select empno from emp where exists(select * from emp_b where 
emp.ename = emp_b.ename and exists(select * from empnullables where emp.empno = 
empnullables.empno))]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[AND(=($cor0.ENAME, $1), EXISTS({
+LogicalFilter(condition=[=($cor0.EMPNO, $0)])
+  LogicalTableScan(table=[[CATALOG, SALES, EMPNULLABLES]])
+}))])
+  LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+})], variablesSet=[[$cor0]])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planMid">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalFilter(condition=[$9])
+    LogicalConditionalCorrelate(correlation=[$cor0], joinType=[left_mark], 
requiredColumns=[{0, 1}])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], 
HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8], BIRTHDATE=[$9])
+        LogicalFilter(condition=[AND(=($cor0.ENAME, $1), $10)])
+          LogicalJoin(condition=[true], joinType=[left_mark])
+            LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+            LogicalFilter(condition=[=($cor0.EMPNO, $0)])
+              LogicalTableScan(table=[[CATALOG, SALES, EMPNULLABLES]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $19), IS NOT DISTINCT 
FROM($1, $20))], joinType=[semi])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalJoin(condition=[AND(IS NOT DISTINCT FROM($10, $21), IS NOT DISTINCT 
FROM($11, $22))], joinType=[semi])
+      LogicalJoin(condition=[=($11, $1)], joinType=[inner])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP_B]])
+        LogicalProject(EMPNO=[$0], ENAME=[$1])
+          LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalJoin(condition=[=($9, $0)], joinType=[inner])
+        LogicalTableScan(table=[[CATALOG, SALES, EMPNULLABLES]])
+        LogicalProject(EMPNO=[$0], ENAME=[$1])
+          LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTopDownGeneralDecorrelateForTwoLevelCorrelate3">
+    <Resource name="sql">
+      <![CDATA[SELECT deptno FROM emp e WHERE EXISTS (SELECT * FROM dept d 
WHERE EXISTS (SELECT * FROM bonus ea WHERE ea.ENAME = e.ENAME AND d.deptno = 
e.deptno))]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(DEPTNO=[$7])
+  LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[AND(=($0, $cor0.ENAME), =($cor1.DEPTNO, 
$cor0.DEPTNO))])
+  LogicalTableScan(table=[[CATALOG, SALES, BONUS]])
+})], variablesSet=[[$cor1]])
+  LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+})], variablesSet=[[$cor0]])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planMid">
+      <![CDATA[
+LogicalProject(DEPTNO=[$7])
+  LogicalFilter(condition=[$9])
+    LogicalConditionalCorrelate(correlation=[$cor0], joinType=[left_mark], 
requiredColumns=[{1, 7}])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalProject(DEPTNO=[$0], NAME=[$1])
+        LogicalFilter(condition=[$2])
+          LogicalConditionalCorrelate(correlation=[$cor1], 
joinType=[left_mark], requiredColumns=[{0}])
+            LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+            LogicalFilter(condition=[AND(=($0, $cor0.ENAME), =($cor1.DEPTNO, 
$cor0.DEPTNO))])
+              LogicalTableScan(table=[[CATALOG, SALES, BONUS]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(DEPTNO=[$7])
+  LogicalJoin(condition=[AND(IS NOT DISTINCT FROM($1, $11), IS NOT DISTINCT 
FROM($7, $12))], joinType=[semi])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalJoin(condition=[AND(IS NOT DISTINCT FROM($2, $8), IS NOT DISTINCT 
FROM($3, $9), IS NOT DISTINCT FROM($0, $10))], joinType=[semi])
+      LogicalJoin(condition=[true], joinType=[inner])
+        LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+        LogicalAggregate(group=[{0, 1}])
+          LogicalProject(ENAME=[$1], DEPTNO=[$7])
+            LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+        LogicalTableScan(table=[[CATALOG, SALES, BONUS]])
+        LogicalProject(ENAME=[$2], DEPTNO0=[$3], DEPTNO=[$0])
+          LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+            LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+            LogicalAggregate(group=[{0, 1}])
+              LogicalProject(ENAME=[$1], DEPTNO=[$7])
+                LogicalTableScan(table=[[CATALOG, SALES, EMP]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/testkit/src/main/java/org/apache/calcite/test/RelOptFixture.java 
b/testkit/src/main/java/org/apache/calcite/test/RelOptFixture.java
index 55e8ba283c..68a6111d06 100644
--- a/testkit/src/main/java/org/apache/calcite/test/RelOptFixture.java
+++ b/testkit/src/main/java/org/apache/calcite/test/RelOptFixture.java
@@ -41,6 +41,7 @@
 import org.apache.calcite.sql.validate.SqlConformance;
 import org.apache.calcite.sql2rel.RelDecorrelator;
 import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.calcite.sql2rel.TopDownGeneralDecorrelator;
 import org.apache.calcite.test.catalog.MockCatalogReaderDynamic;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.util.Closer;
@@ -81,7 +82,7 @@ public class RelOptFixture {
   static final RelOptFixture DEFAULT =
       new RelOptFixture(SqlToRelFixture.TESTER, SqlTestFactory.INSTANCE,
           null, RelSupplier.NONE, null, null,
-          ImmutableMap.of(), (f, r) -> r, (f, r) -> r, false, false)
+          ImmutableMap.of(), (f, r) -> r, (f, r) -> r, false, false, false)
           .withFactory(f ->
               f.withValidatorConfig(c -> c.withIdentifierExpansion(true))
                   .withSqlToRelConfig(c -> c.withExpand(false)))
@@ -102,6 +103,7 @@ public class RelOptFixture {
   final BiFunction<RelOptFixture, RelNode, RelNode> after;
   final boolean decorrelate;
   final boolean lateDecorrelate;
+  final boolean topDownGeneralDecorrelate;
 
   RelOptFixture(SqlTester tester, SqlTestFactory factory,
       @Nullable DiffRepository diffRepos, RelSupplier relSupplier,
@@ -109,7 +111,7 @@ public class RelOptFixture {
       ImmutableMap<Hook, Consumer<Object>> hooks,
       BiFunction<RelOptFixture, RelNode, RelNode> before,
       BiFunction<RelOptFixture, RelNode, RelNode> after,
-      boolean decorrelate, boolean lateDecorrelate) {
+      boolean decorrelate, boolean lateDecorrelate, boolean 
topDownGeneralDecorrelate) {
     this.tester = requireNonNull(tester, "tester");
     this.factory = factory;
     this.diffRepos = diffRepos;
@@ -121,6 +123,7 @@ public class RelOptFixture {
     this.hooks = requireNonNull(hooks, "hooks");
     this.decorrelate = decorrelate;
     this.lateDecorrelate = lateDecorrelate;
+    this.topDownGeneralDecorrelate = topDownGeneralDecorrelate;
   }
 
   public RelOptFixture withDiffRepos(DiffRepository diffRepos) {
@@ -129,7 +132,7 @@ public RelOptFixture withDiffRepos(DiffRepository 
diffRepos) {
     }
     return new RelOptFixture(tester, factory, diffRepos, relSupplier,
         preProgram, planner, hooks, before, after, decorrelate,
-        lateDecorrelate);
+        lateDecorrelate, topDownGeneralDecorrelate);
   }
 
   public RelOptFixture withRelSupplier(RelSupplier relSupplier) {
@@ -138,7 +141,7 @@ public RelOptFixture withRelSupplier(RelSupplier 
relSupplier) {
     }
     return new RelOptFixture(tester, factory, diffRepos, relSupplier,
         preProgram, planner, hooks, before, after, decorrelate,
-        lateDecorrelate);
+        lateDecorrelate, topDownGeneralDecorrelate);
   }
 
   public RelOptFixture sql(String sql) {
@@ -156,7 +159,7 @@ public RelOptFixture withBefore(
         (sql, r) -> transform.apply(this, before0.apply(this, r));
     return new RelOptFixture(tester, factory, diffRepos, relSupplier,
         preProgram, planner, hooks, before, after, decorrelate,
-        lateDecorrelate);
+        lateDecorrelate, topDownGeneralDecorrelate);
   }
 
   public RelOptFixture withAfter(
@@ -166,7 +169,7 @@ public RelOptFixture withAfter(
         (sql, r) -> transform.apply(this, after0.apply(this, r));
     return new RelOptFixture(tester, factory, diffRepos, relSupplier,
         preProgram, planner, hooks, before, after, decorrelate,
-        lateDecorrelate);
+        lateDecorrelate, topDownGeneralDecorrelate);
   }
 
   public RelOptFixture withDynamicTable() {
@@ -180,7 +183,7 @@ public RelOptFixture 
withFactory(UnaryOperator<SqlTestFactory> transform) {
     }
     return new RelOptFixture(tester, factory, diffRepos, relSupplier,
         preProgram, planner, hooks, before, after, decorrelate,
-        lateDecorrelate);
+        lateDecorrelate, topDownGeneralDecorrelate);
   }
 
   public RelOptFixture withPre(HepProgram preProgram) {
@@ -189,7 +192,7 @@ public RelOptFixture withPre(HepProgram preProgram) {
     }
     return new RelOptFixture(tester, factory, diffRepos, relSupplier,
         preProgram, planner, hooks, before, after, decorrelate,
-        lateDecorrelate);
+        lateDecorrelate, topDownGeneralDecorrelate);
   }
 
   public RelOptFixture withPreRule(RelOptRule... rules) {
@@ -206,7 +209,7 @@ public RelOptFixture withPlanner(RelOptPlanner planner) {
     }
     return new RelOptFixture(tester, factory, diffRepos, relSupplier,
         preProgram, planner, hooks, before, after, decorrelate,
-        lateDecorrelate);
+        lateDecorrelate, topDownGeneralDecorrelate);
   }
 
   public RelOptFixture withProgram(HepProgram program) {
@@ -235,7 +238,7 @@ public <T> RelOptFixture withHook(Hook hook, Consumer<T> 
handler) {
     }
     return new RelOptFixture(tester, factory, diffRepos, relSupplier,
         preProgram, planner, hooks, before, after, decorrelate,
-        lateDecorrelate);
+        lateDecorrelate, topDownGeneralDecorrelate);
   }
 
   public <V> RelOptFixture withProperty(Hook hook, V value) {
@@ -270,7 +273,16 @@ public RelOptFixture withLateDecorrelate(final boolean 
lateDecorrelate) {
     }
     return new RelOptFixture(tester, factory, diffRepos, relSupplier,
         preProgram, planner, hooks, before, after, decorrelate,
-        lateDecorrelate);
+        lateDecorrelate, topDownGeneralDecorrelate);
+  }
+
+  public RelOptFixture withTopDownGeneralDecorrelate(final boolean 
topDownGeneralDecorrelate) {
+    if (topDownGeneralDecorrelate == this.topDownGeneralDecorrelate) {
+      return this;
+    }
+    return new RelOptFixture(tester, factory, diffRepos, relSupplier,
+        preProgram, planner, hooks, before, after, decorrelate,
+        lateDecorrelate, topDownGeneralDecorrelate);
   }
 
   public RelOptFixture withDecorrelate(final boolean decorrelate) {
@@ -279,7 +291,7 @@ public RelOptFixture withDecorrelate(final boolean 
decorrelate) {
     }
     return new RelOptFixture(tester, factory, diffRepos, relSupplier,
         preProgram, planner, hooks, before, after, decorrelate,
-        lateDecorrelate);
+        lateDecorrelate, topDownGeneralDecorrelate);
   }
 
   public RelOptFixture withTrim(final boolean trim) {
@@ -389,7 +401,10 @@ private void checkPlanning(boolean unchanged) {
       assertThat(r3, relIsValid());
       final RelBuilder relBuilder =
           RelFactories.LOGICAL_BUILDER.create(cluster, null);
-      r4 = RelDecorrelator.decorrelateQuery(r3, relBuilder);
+      r4 =
+          topDownGeneralDecorrelate
+              ? TopDownGeneralDecorrelator.decorrelateQuery(r3, relBuilder)
+              : RelDecorrelator.decorrelateQuery(r3, relBuilder);
     } else {
       r4 = r3;
     }


Reply via email to