This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ffc8ac935e2 [SPARK-36124][SQL] Support subqueries with correlation 
through INTERSECT/EXCEPT
ffc8ac935e2 is described below

commit ffc8ac935e24c7d15a700034bd556c2f4b8271ee
Author: Jack Chen <[email protected]>
AuthorDate: Wed Feb 22 13:22:59 2023 +0800

    [SPARK-36124][SQL] Support subqueries with correlation through 
INTERSECT/EXCEPT
    
    ## What changes were proposed in this pull request?
    
    Adds support for subquery decorrelation with INTERSECT and EXCEPT operators 
on the correlation paths. For example:
    ```
    SELECT t1a, (
      SELECT avg(b) FROM (
        SELECT t2b as b FROM t2 WHERE t2a = t1a
        INTERSECT
        SELECT t3b as b FROM t3 WHERE t3a = t1a
    ))
    FROM t1
    ```
    
    This uses the same logic as for UNION decorrelation added in 
https://github.com/apache/spark/pull/39375. The only real change is logic added 
to handle INTERSECT/EXCEPT DISTINCT, which are rewritten to semi/anti join and 
require extra logic in rewriteDomainJoins.
    
    [This 
doc](https://docs.google.com/document/d/11b9ClCF2jYGU7vU2suOT7LRswYkg6tZ8_6xJbvxfh2I/edit#)
 describes how the decorrelation rewrite works for set operations and the code 
changes for it - see the INTERSECT/EXCEPT section in particular.
    
    In this PR, we always add DomainJoins for correlation through 
INTERSECT/EXCEPT, and never do direct substitution of the outer refs. That can 
also be added as an optimization in a follow-up - it only affects performance, 
not surface area coverage.
    
    ### Why are the changes needed?
    To improve subquery support in Spark.
    
    ### Does this PR introduce _any_ user-facing change?
    Before this change, queries like this would return an error like: 
`Decorrelate inner query through Intersect is not supported.`
    
    After this PR, this query can run successfully.
    
    ### How was this patch tested?
    Unit tests and SQL query tests.
    
    Moved the UNION decorrelation SQL tests to file scalar-subquery-set-op.sql 
and duplicated them to test each of [UNION/INTERSECT/EXCEPT] [ALL/DISTINCT]
    
    Factors tested included:
    - Subquery type:
      - Eligible for DecorrelateInnerQuery: Scalar, lateral join
      - Not supported: IN, EXISTS
    - UNION inside and outside subquery
    - Correlation in where, project, group by, aggregates, or no correlation
    - Project, Aggregate, Window under the Union
    - COUNT bug
    
    Closes #39759 from jchen5/subq-intersect.
    
    Authored-by: Jack Chen <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/catalyst/analysis/CheckAnalysis.scala      |    2 +-
 .../catalyst/optimizer/DecorrelateInnerQuery.scala |  143 ++-
 .../optimizer/DecorrelateInnerQuerySuite.scala     |  112 ++-
 .../resources/sql-tests/inputs/join-lateral.sql    |  150 ++-
 .../exists-subquery/exists-joins-and-set-ops.sql   |   86 +-
 .../subquery/in-subquery/in-set-operations.sql     |  109 ++
 .../scalar-subquery/scalar-subquery-select.sql     |  105 --
 .../scalar-subquery/scalar-subquery-set-op.sql     |  621 ++++++++++++
 .../sql-tests/results/join-lateral.sql.out         |  249 ++++-
 .../exists-joins-and-set-ops.sql.out               |  216 +++-
 .../subquery/in-subquery/in-set-operations.sql.out |  348 +++++++
 .../scalar-subquery/scalar-subquery-select.sql.out |  195 ----
 .../scalar-subquery/scalar-subquery-set-op.sql.out | 1043 ++++++++++++++++++++
 .../scala/org/apache/spark/sql/SubquerySuite.scala |   78 +-
 14 files changed, 3085 insertions(+), 372 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 77948735dbe..fafdd679aa5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -1231,7 +1231,7 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
         case p @ (_: ResolvedHint | _: LeafNode | _: Repartition | _: 
SubqueryAlias) =>
           p.children.foreach(child => checkPlan(child, aggregated, 
canContainOuter))
 
-        case p @ (_ : Union) =>
+        case p @ (_ : Union | _: SetOperation) =>
           // Set operations (e.g. UNION) containing correlated values are only 
supported
           // with DecorrelateInnerQuery framework.
           val childCanContainOuter = (canContainOuter
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
index 069279a7a04..01029fe1af0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.TreePattern.OUTER_REFERENCE
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.collection.Utils
 
 /**
@@ -254,24 +255,127 @@ object DecorrelateInnerQuery extends PredicateHelper {
   /**
    * Rewrites a domain join cond so that it can be pushed to the right side of 
a
    * union/intersect/except operator.
+   *
+   * Example: Take a query like:
+   * select * from t0 join lateral (
+   *   select a from t1 where b < t0.x
+   *   union all
+   *   select b from t2 where c < t0.y)
+   *
+   * over tables t0(x, y), t1(a), t2(b).
+   *
+   * Step 1: After DecorrelateInnerQuery runs to introduce DomainJoins,
+   * we have outer table t0 with attributes [x#1, y#2] and the subquery is a 
Union where
+   * - the left side has DomainJoin [t0.x#4, t0.y#5] and output [t1.a#3, 
t0.x#4, t0.y#5]
+   * - the right side has DomainJoin [t0.x#7, t0.y#8] and output [t2.b#6, 
t0.x#7, t0.y#8]
+   * Here all the x and y attributes are from t0, but they are different 
instances from
+   * different joins of t0.
+   *
+   * The domain join conditions are x#4 <=> x#1 and y#5 <=> y#2, i.e. it joins 
the attributes from
+   * the original outer table with the attributes coming out of the DomainJoin 
of the left side,
+   * because the output of a set op uses the attribute names from the left 
side.
+   *
+   * Step 2: rewriteDomainJoins runs, in which we arrive at this function.
+   * In this function, we construct the domain join conditions for the 
children of the Union.
+   * For the left side, those remain unchanged, while for the right side they 
are remapped to
+   * use the attribute names of the right-side DomainJoin: x#7 <=> x#1 and y#8 
<=> y#2.
    */
-  def pushConditionsThroughUnion(
+  def pushDomainConditionsThroughSetOperation(
       conditions: Seq[Expression],
-      union: Union,
+      setOp: LogicalPlan, // Union or SetOperation
       child: LogicalPlan): Seq[Expression] = {
     // The output attributes are always equal to the left child's output
-    assert(union.output.size == child.output.size)
-    val map = AttributeMap(union.output.zip(child.output))
-    conditions.map {
+    assert(setOp.output.size == child.output.size)
+    val map = AttributeMap(setOp.output.zip(child.output))
+    conditions.collect {
       // The left hand side is the domain attribute used in the inner query 
and the right hand side
       // is the attribute from the outer query. (See comment above in 
buildDomainAttrMap.)
       // We need to remap the attribute names used in the inner query (left 
hand side) to account
       // for the different names in each union child. We should not remap the 
attribute names used
       // in the outer query.
+      //
+      // Note: the reason we can't just use the original joinCond from when 
the DomainJoin was
+      // constructed is that constructing the DomainJoins happens much earlier 
than rewriting the
+      // DomainJoins into actual joins, with many optimization steps in
+      // between, which could change the attributes involved (e.g. 
CollapseProject).
       case EqualNullSafe(left: Attribute, right: Expression) =>
         EqualNullSafe(map.getOrElse(left, left), right)
-      case EqualTo(left: Attribute, right: Expression) =>
-        EqualTo(map.getOrElse(left, left), right)
+    }
+  }
+
+  /**
+   * This is to handle INTERSECT/EXCEPT DISTINCT which are rewritten to left 
semi/anti join in
+   * ReplaceIntersectWithSemiJoin and ReplaceExceptWithAntiJoin.
+   *
+   * To rewrite the domain join on the right side, we need to remap the 
attributes in the domain
+   * join cond, using the mapping between left and right sides in the 
semi/anti join cond.
+   *
+   * After DecorrelateInnerQuery, the domain join conds reference the output 
names of the
+   * INTERSECT/EXCEPT, which come from the left side. When rewriting the 
DomainJoin in the
+   * right child, we need to remap the domain attribute names to account for 
the different
+   * names in the left vs right child, similar to 
pushDomainConditionsThroughSetOperation.
+   * But after the rewrite to semi/anti join is performed, we instead need to 
do the remapping
+   * based on the semi/anti join cond which contains equi-joins between the 
left and right
+   * outputs.
+   *
+   * Example: Take a query like:
+   * select * from t0 join lateral (
+   *   select a from t1 where b < t0.x
+   *   intersect distinct
+   *   select b from t2 where c < t0.y)
+   *
+   * over tables t0(x, y), t1(a), t2(b).
+   *
+   * Step 1 (this is the same as the Union case described above):
+   * After DecorrelateInnerQuery runs to introduce DomainJoins,
+   * we have outer table t0 with attributes [x#1, y#2] and the subquery is a 
Intersect where
+   * - the left side has DomainJoin [t0.x#4, t0.y#5] and output [t1.a#3, 
t0.x#4, t0.y#5]
+   * - the right side has DomainJoin [t0.x#7, t0.y#8] and output [t2.b#6, 
t0.x#7, t0.y#8]
+   * Here all the x and y attributes are from t0, but they are different 
instances from
+   * different joins of t0.
+   *
+   * The domain join conditions are x#4 <=> x#1 and y#5 <=> y#2, i.e. it joins 
the attributes from
+   * the original outer table with the attributes coming out of the DomainJoin 
of the left side,
+   * because the output of a set op uses the attribute names from the left 
side.
+   *
+   * Step 2:
+   * ReplaceIntersectWithSemiJoin runs and transforms the Intersect to
+   * Join LeftSemi, (((a#3 <=> b#6) AND (x#4 <=> x#7)) AND (y#5 <=> y#8))
+   * with equi-joins between the left and right outputs.
+   * For EXCEPT DISTINCT the same thing happens but with anti join in 
ReplaceExceptWithAntiJoin.
+   *
+   * Step 3:
+   * rewriteDomainJoins runs, in which we arrive at this function, which uses 
the
+   * semijoin condition to construct the domain join cond remapping for the 
right side:
+   * x#7 <=> x#1 and y#8 <=> y#2. These new conds together with the original 
domain join cond are
+   * used to rewrite the DomainJoins.
+   *
+   * Note: This logic only applies to INTERSECT/EXCEPT DISTINCT. For 
INTERSECT/EXCEPT ALL,
+   * step 1 is the same but instead of step 2, RewriteIntersectAll or 
RewriteExceptAll
+   * replace the logical Intersect/Except operator with a combination of
+   * Union, Aggregate, and Generate. Then the DomainJoin conds will go through
+   * pushDomainConditionsThroughSetOperation, not this function.
+   */
+  def pushDomainConditionsThroughSemiAntiJoin(
+      domainJoinConditions: Seq[Expression],
+      join: Join
+  ) : Seq[Expression] = {
+    if (join.condition.isDefined
+      && SQLConf.get.getConf(SQLConf.DECORRELATE_SET_OPS_ENABLED)) {
+      // The domain conds will be like leftInner <=> outer, and the semi/anti 
join cond will be like
+      // leftInner <=> rightInner. We add additional domain conds rightInner 
<=> outer which are
+      // used to rewrite the right-side DomainJoin.
+      val transitiveConds = 
splitConjunctivePredicates(join.condition.get).collect {
+        case EqualNullSafe(joinLeft: Attribute, joinRight: Attribute) =>
+          domainJoinConditions.collect {
+            case EqualNullSafe(domainInner: Attribute, domainOuter: Expression)
+              if domainInner.semanticEquals(joinLeft) =>
+              EqualNullSafe(joinRight, domainOuter)
+          }
+      }.flatten
+      domainJoinConditions ++ transitiveConds
+    } else {
+      domainJoinConditions
     }
   }
 
@@ -336,9 +440,18 @@ object DecorrelateInnerQuery extends PredicateHelper {
         throw new IllegalStateException(
           s"Unable to rewrite domain join with conditions: $conditions\n$d.")
       }
-    case u: Union =>
-      u.mapChildren { child =>
-        rewriteDomainJoins(outerPlan, child, 
pushConditionsThroughUnion(conditions, u, child))
+    case s @ (_ : Union | _: SetOperation) =>
+      // Remap the domain attributes for the children of the set op - see 
comments on the function.
+      s.mapChildren { child =>
+        rewriteDomainJoins(outerPlan, child,
+          pushDomainConditionsThroughSetOperation(conditions, s, child))
+      }
+    case j: Join if j.joinType == LeftSemi || j.joinType == LeftAnti =>
+      // For the INTERSECT/EXCEPT DISTINCT case, the set op is rewritten to a 
semi/anti join and we
+      // need to remap the domain attributes for the right child - see 
comments on the function.
+      j.mapChildren { child =>
+        rewriteDomainJoins(outerPlan, child,
+          pushDomainConditionsThroughSemiAntiJoin(conditions, j))
       }
     case p: LogicalPlan =>
       p.mapChildren(rewriteDomainJoins(outerPlan, _, conditions))
@@ -723,7 +836,7 @@ object DecorrelateInnerQuery extends PredicateHelper {
             val newJoin = j.copy(left = newLeft, right = newRight, condition = 
newCondition)
             (newJoin, newJoinCond, newOuterReferenceMap)
 
-          case u: Union =>
+          case s @ (_ : Union | _: SetOperation) =>
             // Set ops are decorrelated by pushing the domain join into each 
child. For details see
             // 
https://docs.google.com/document/d/11b9ClCF2jYGU7vU2suOT7LRswYkg6tZ8_6xJbvxfh2I/edit
 
@@ -739,16 +852,16 @@ object DecorrelateInnerQuery extends PredicateHelper {
             //   select c, t_outer.a, t_outer.b from t1 join t_outer where 
t1.a = t_outer.a
             //   UNION ALL
             //   select c, t_outer.a, t_outer.b from t2 join t_outer where 
t2.b = t_outer.b
-            val collectedChildOuterReferences = 
collectOuterReferencesInPlanTree(u)
+            val collectedChildOuterReferences = 
collectOuterReferencesInPlanTree(s)
             val newOuterReferences = AttributeSet(
               parentOuterReferences ++ collectedChildOuterReferences)
 
             val childDecorrelateResults =
-              u.children.map { child =>
+              s.children.map { child =>
                 val (decorrelatedChild, newJoinCond, newOuterReferenceMap) =
                   decorrelate(child, newOuterReferences, aggregated, 
underSetOp = true)
                 // Create a Project to ensure that the domain attributes are 
added to the same
-                // positions in each child of the union. If we don't 
explicitly construct this
+                // positions in each child of the set op. If we don't 
explicitly construct this
                 // Project, they could get added at the beginning or the end 
of the output columns
                 // depending on the child plan.
                 // The inner expressions for the domain are the values of 
newOuterReferenceMap.
@@ -762,7 +875,7 @@ object DecorrelateInnerQuery extends PredicateHelper {
             // names are from the first child
             val newJoinCond = childDecorrelateResults.head._2
             val newOuterReferenceMap = 
AttributeMap(childDecorrelateResults.head._3)
-            (u.withNewChildren(newChildren), newJoinCond, newOuterReferenceMap)
+            (s.withNewChildren(newChildren), newJoinCond, newOuterReferenceMap)
 
           case g: Generate if g.requiredChildOutput.isEmpty =>
             // Generate with non-empty required child output cannot host
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
index 1495505fb14..304f7de4c6a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
@@ -32,9 +32,13 @@ class DecorrelateInnerQuerySuite extends PlanTest {
   val x = AttributeReference("x", IntegerType)()
   val y = AttributeReference("y", IntegerType)()
   val z = AttributeReference("z", IntegerType)()
+  val a3 = AttributeReference("a3", IntegerType)()
+  val b3 = AttributeReference("b3", IntegerType)()
+  val c3 = AttributeReference("c3", IntegerType)()
   val t0 = OneRowRelation()
   val testRelation = LocalRelation(a, b, c)
   val testRelation2 = LocalRelation(x, y, z)
+  val testRelation3 = LocalRelation(a3, b3, c3)
 
   private def hasOuterReferences(plan: LogicalPlan): Boolean = {
     plan.exists(_.expressions.exists(SubExprUtils.containsOuter))
@@ -283,7 +287,7 @@ class DecorrelateInnerQuerySuite extends PlanTest {
     check(innerPlan, outerPlan, correctAnswer, Seq(y <=> y, x === a, y === z))
   }
 
-  test("union in correlation path") {
+  test("SPARK-36124: union in correlation path") {
     val outerPlan = testRelation2
     val innerPlan =
       Union(
@@ -305,7 +309,7 @@ class DecorrelateInnerQuerySuite extends PlanTest {
     check(innerPlan, outerPlan, correctAnswer, Seq(x <=> x, y <=> y))
   }
 
-  test("another union in correlation path") {
+  test("SPARK-36124: another union in correlation path") {
     val outerPlan = testRelation2
     val innerPlan =
       Union(Seq(
@@ -333,6 +337,110 @@ class DecorrelateInnerQuerySuite extends PlanTest {
     check(innerPlan, outerPlan, correctAnswer, Seq(x <=> x, y <=> y, z <=> z))
   }
 
+  test("SPARK-36124: INTERSECT ALL in correlation path") {
+    val outerPlan = testRelation2
+    val innerPlan =
+      Intersect(
+        Filter(And(OuterReference(x) === a, c === 3),
+          testRelation),
+        Filter(And(OuterReference(y) === b3, c3 === 6),
+          testRelation3),
+        isAll = true)
+    val x2 = x.newInstance()
+    val y2 = y.newInstance()
+    val correctAnswer =
+      Intersect(
+        Project(Seq(a, b, c, x, y),
+          Filter(And(x === a, c === 3),
+            DomainJoin(Seq(x, y),
+              testRelation))),
+        Project(Seq(a3, b3, c3, x2, y2),
+          Filter(And(y2 === b3, c3 === 6),
+            DomainJoin(Seq(x2, y2),
+              testRelation3))),
+        isAll = true
+      )
+    check(innerPlan, outerPlan, correctAnswer, Seq(x <=> x, y <=> y))
+  }
+
+    test("SPARK-36124: INTERSECT DISTINCT in correlation path") {
+    val outerPlan = testRelation2
+    val innerPlan =
+      Intersect(
+        Filter(And(OuterReference(x) === a, c === 3),
+          testRelation),
+        Filter(And(OuterReference(y) === b3, c3 === 6),
+          testRelation3),
+        isAll = false)
+    val x2 = x.newInstance()
+    val y2 = y.newInstance()
+    val correctAnswer =
+      Intersect(
+        Project(Seq(a, b, c, x, y),
+          Filter(And(x === a, c === 3),
+            DomainJoin(Seq(x, y),
+              testRelation))),
+        Project(Seq(a3, b3, c3, x2, y2),
+          Filter(And(y2 === b3, c3 === 6),
+            DomainJoin(Seq(x2, y2),
+              testRelation3))),
+        isAll = false
+      )
+    check(innerPlan, outerPlan, correctAnswer, Seq(x <=> x, y <=> y))
+  }
+
+  test("SPARK-36124: EXCEPT ALL in correlation path") {
+    val outerPlan = testRelation2
+    val innerPlan =
+      Except(
+        Filter(And(OuterReference(x) === a, c === 3),
+          testRelation),
+        Filter(And(OuterReference(y) === b3, c3 === 6),
+          testRelation3),
+        isAll = true)
+    val x2 = x.newInstance()
+    val y2 = y.newInstance()
+    val correctAnswer =
+      Except(
+        Project(Seq(a, b, c, x, y),
+          Filter(And(x === a, c === 3),
+            DomainJoin(Seq(x, y),
+              testRelation))),
+        Project(Seq(a3, b3, c3, x2, y2),
+          Filter(And(y2 === b3, c3 === 6),
+            DomainJoin(Seq(x2, y2),
+              testRelation3))),
+        isAll = true
+      )
+    check(innerPlan, outerPlan, correctAnswer, Seq(x <=> x, y <=> y))
+  }
+
+  test("SPARK-36124: EXCEPT DISTINCT in correlation path") {
+    val outerPlan = testRelation2
+    val innerPlan =
+      Except(
+        Filter(And(OuterReference(x) === a, c === 3),
+          testRelation),
+        Filter(And(OuterReference(y) === b3, c3 === 6),
+          testRelation3),
+        isAll = false)
+    val x2 = x.newInstance()
+    val y2 = y.newInstance()
+    val correctAnswer =
+      Except(
+        Project(Seq(a, b, c, x, y),
+          Filter(And(x === a, c === 3),
+            DomainJoin(Seq(x, y),
+              testRelation))),
+        Project(Seq(a3, b3, c3, x2, y2),
+          Filter(And(y2 === b3, c3 === 6),
+            DomainJoin(Seq(x2, y2),
+              testRelation3))),
+        isAll = false
+      )
+    check(innerPlan, outerPlan, correctAnswer, Seq(x <=> x, y <=> y))
+  }
+
   test("SPARK-38155: distinct with non-equality correlated predicates") {
     val outerPlan = testRelation2
     val innerPlan =
diff --git a/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql 
b/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
index 720abbb69b6..408a152d9b8 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
@@ -203,6 +203,44 @@ SELECT * FROM t1 JOIN LATERAL
   FROM   t4
   WHERE  t4.c1 > t1.c2);
 
+-- INTERSECT
+SELECT * FROM t1 JOIN LATERAL
+  (SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1
+  INTERSECT ALL
+  SELECT t4.c2
+  FROM   t4
+  WHERE  t4.c1 = t1.c1);
+
+SELECT * FROM t1 JOIN LATERAL
+  (SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1
+  INTERSECT DISTINCT
+  SELECT t4.c2
+  FROM   t4
+  WHERE  t4.c1 > t1.c2);
+
+-- EXCEPT
+SELECT * FROM t1 JOIN LATERAL
+  (SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1
+  EXCEPT ALL
+  SELECT t4.c2
+  FROM   t4
+  WHERE  t4.c1 = t1.c1);
+
+SELECT * FROM t1 JOIN LATERAL
+  (SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1
+  EXCEPT DISTINCT
+  SELECT t4.c2
+  FROM   t4
+  WHERE  t4.c1 > t1.c2);
+
 -- COUNT bug with UNION in subquery
 SELECT * FROM t1 JOIN LATERAL
   (SELECT COUNT(t2.c2)
@@ -239,6 +277,46 @@ SELECT * FROM t1 JOIN LATERAL
   SELECT t4.c2
   FROM   t4);
 
+SELECT * FROM t1 JOIN LATERAL
+  (SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1 and t2.c2 >= t1.c2
+  UNION DISTINCT
+  SELECT t4.c2
+  FROM   t4);
+
+SELECT * FROM t1 JOIN LATERAL
+  (SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1 and t2.c2 >= t1.c2
+  INTERSECT ALL
+  SELECT t4.c2
+  FROM   t4);
+  
+SELECT * FROM t1 JOIN LATERAL
+  (SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1 and t2.c2 >= t1.c2
+  INTERSECT DISTINCT
+  SELECT t4.c2
+  FROM   t4);
+
+SELECT * FROM t1 JOIN LATERAL
+  (SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1 and t2.c2 >= t1.c2
+  EXCEPT ALL
+  SELECT t4.c2
+  FROM   t4);
+  
+SELECT * FROM t1 JOIN LATERAL
+  (SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1 and t2.c2 >= t1.c2
+  EXCEPT DISTINCT
+  SELECT t4.c2
+  FROM   t4);
+
 -- Correlation under group by
 SELECT * FROM t1 JOIN LATERAL
   (SELECT t2.c2
@@ -272,28 +350,88 @@ SELECT * FROM t1 JOIN LATERAL
   FROM   t4);
 
 -- lateral join under union
-(SELECT * FROM t1 JOIN LATERAL (SELECT * FROM t2 WHERE t2.c1 = t1.c1))
+SELECT * FROM t1 JOIN LATERAL (SELECT * FROM t2 WHERE t2.c1 = t1.c1)
 UNION ALL
-(SELECT * FROM t1 JOIN t4);
+SELECT * FROM t1 JOIN t4;
 
 -- union above and below lateral join
-(SELECT * FROM t1 JOIN LATERAL
+SELECT * FROM t1 JOIN LATERAL
   (SELECT t2.c2
   FROM   t2
   WHERE  t2.c1 = t1.c1
   UNION ALL
   SELECT t4.c2
   FROM   t4
-  WHERE  t4.c1 = t1.c1))
+  WHERE  t4.c1 = t1.c1)
 UNION ALL
-(SELECT * FROM t2 JOIN LATERAL
+SELECT * FROM t2 JOIN LATERAL
   (SELECT t1.c2
   FROM   t1
   WHERE  t2.c1 <= t1.c1
   UNION ALL
   SELECT t4.c2
   FROM   t4
-  WHERE  t4.c1 < t2.c1));
+  WHERE  t4.c1 < t2.c1);
+
+-- Combinations of set ops
+SELECT * FROM t1 JOIN LATERAL
+  ((SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1
+  EXCEPT DISTINCT
+  SELECT t4.c2
+  FROM   t4
+  WHERE  t4.c1 > t1.c2)
+  UNION DISTINCT
+  (SELECT t4.c1
+  FROM   t4
+  WHERE  t4.c1 <= t1.c2
+  INTERSECT ALL
+  SELECT t4.c2
+  FROM   t4
+  WHERE  t4.c1 <> t1.c1)
+);
+
+SELECT * FROM t1 JOIN LATERAL
+  ((SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1
+  UNION ALL
+  SELECT t4.c2
+  FROM   t4
+  WHERE  t4.c1 > t1.c2)
+  INTERSECT DISTINCT
+  (SELECT t4.c1
+  FROM   t4
+  WHERE  t4.c1 <= t1.c2
+  EXCEPT ALL
+  SELECT t4.c2
+  FROM   t4
+  WHERE  t4.c1 <> t1.c1)
+);
+
+-- Semi join with correlation on left side - supported
+SELECT * FROM t1 JOIN LATERAL (SELECT sum(c1) FROM
+  (SELECT *
+  FROM   t2
+  WHERE  t2.c1 <= t1.c1) lhs
+  LEFT SEMI JOIN
+  (SELECT *
+  FROM   t4) rhs
+  ON lhs.c1 <=> rhs.c1 and lhs.c2 <=> rhs.c2
+);
+
+-- Semi join with correlation on right side - unsupported
+SELECT * FROM t1 JOIN LATERAL (SELECT sum(c1) FROM
+  (SELECT *
+  FROM   t2
+  WHERE  t2.c1 <= t1.c1) lhs
+  LEFT SEMI JOIN
+  (SELECT *
+  FROM   t4
+  WHERE t4.c1 > t1.c2) rhs
+  ON lhs.c1 <=> rhs.c1 and lhs.c2 <=> rhs.c2
+);
 
 -- SPARK-41961: lateral join with table-valued functions
 SELECT * FROM LATERAL EXPLODE(ARRAY(1, 2));
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-joins-and-set-ops.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-joins-and-set-ops.sql
index 81d47a798fc..28cb2f74748 100644
--- 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-joins-and-set-ops.sql
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-joins-and-set-ops.sql
@@ -238,23 +238,83 @@ WHERE  EXISTS (SELECT *
                  WHERE  dept_id >= 30 
                         AND dept_id <= 50);
 
--- Correlated predicates under UNION - unsupported
-SELECT * 
-FROM   emp 
-WHERE  EXISTS (SELECT * 
-               FROM   dept 
+-- Correlated predicates under set ops - unsupported
+SELECT *
+FROM   emp
+WHERE  EXISTS (SELECT *
+               FROM   dept
                WHERE  dept_id = emp.dept_id and state = "CA"
-               UNION 
-               SELECT * 
-               FROM   dept 
+               UNION
+               SELECT *
+               FROM   dept
                WHERE  dept_id = emp.dept_id and state = "TX");
 
-SELECT * 
-FROM   emp 
-WHERE NOT EXISTS (SELECT * 
-               FROM   dept 
+SELECT *
+FROM   emp
+WHERE NOT EXISTS (SELECT *
+               FROM   dept
                WHERE  dept_id = emp.dept_id and state = "CA"
-               UNION 
+               UNION
+               SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "TX");
+
+SELECT *
+FROM   emp
+WHERE  EXISTS (SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "CA"
+               INTERSECT ALL
+               SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "TX");
+
+SELECT *
+FROM   emp
+WHERE EXISTS (SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "CA"
+               INTERSECT DISTINCT
+               SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "TX");
+
+SELECT *
+FROM   emp
+WHERE  EXISTS (SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "CA"
+               EXCEPT ALL
+               SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "TX");
+
+SELECT *
+FROM   emp
+WHERE  EXISTS (SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "CA"
+               EXCEPT DISTINCT
+               SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "TX");
+
+SELECT *
+FROM   emp
+WHERE NOT EXISTS (SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "CA"
+               INTERSECT ALL
+               SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "TX");
+
+SELECT *
+FROM   emp
+WHERE NOT EXISTS (SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "CA"
+               EXCEPT DISTINCT
                SELECT * 
                FROM   dept 
                WHERE  dept_id = emp.dept_id and state = "TX");
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-set-operations.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-set-operations.sql
index b81dd7dce7f..e4a931ce2c8 100644
--- 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-set-operations.sql
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-set-operations.sql
@@ -470,3 +470,112 @@ HAVING   t1b NOT IN
                 FROM   t3)
 ORDER BY t1c DESC NULLS LAST, t1i;
 
+-- Correlated set ops inside IN - unsupported
+
+SELECT *
+FROM   t1
+WHERE  t1a IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               UNION ALL
+               SELECT t3a
+               FROM   t3);
+
+SELECT *
+FROM   t1
+WHERE  t1a IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               UNION DISTINCT
+               SELECT t3a
+               FROM   t3);
+
+SELECT *
+FROM   t1
+WHERE  t1a IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               INTERSECT ALL
+               SELECT t3a
+               FROM   t3);
+
+SELECT *
+FROM   t1
+WHERE  t1a IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               INTERSECT DISTINCT
+               SELECT t3a
+               FROM   t3);
+
+SELECT *
+FROM   t1
+WHERE  t1a IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               EXCEPT ALL
+               SELECT t3a
+               FROM   t3);
+
+SELECT *
+FROM   t1
+WHERE  t1a IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               EXCEPT DISTINCT
+               SELECT t3a
+               FROM   t3);
+
+SELECT *
+FROM   t1
+WHERE  t1a NOT IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               UNION ALL
+               SELECT t3a
+               FROM   t3);
+
+SELECT *
+FROM   t1
+WHERE  t1a NOT IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               UNION DISTINCT
+               SELECT t3a
+               FROM   t3);
+
+SELECT *
+FROM   t1
+WHERE  t1a NOT IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               INTERSECT ALL
+               SELECT t3a
+               FROM   t3);
+
+SELECT *
+FROM   t1
+WHERE  t1a NOT IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               INTERSECT DISTINCT
+               SELECT t3a
+               FROM   t3);
+
+SELECT *
+FROM   t1
+WHERE  t1a NOT IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               EXCEPT ALL
+               SELECT t3a
+               FROM   t3);
+
+SELECT *
+FROM   t1
+WHERE  t1a NOT IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               EXCEPT DISTINCT
+               SELECT t3a
+               FROM   t3);
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-select.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-select.sql
index 10f8f190fd8..6d673f149cc 100644
--- 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-select.sql
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-select.sql
@@ -235,108 +235,3 @@ SELECT c, (
     FROM (VALUES (0, 6), (1, 5), (2, 4), (3, 3)) t1(a, b)
     WHERE a + b = c
 ) FROM (VALUES (6)) t2(c);
-
--- Set operations in correlation path
-
-CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0);
-CREATE OR REPLACE TEMP VIEW t1(t1a, t1b, t1c) AS VALUES (1, 1, 3);
-CREATE OR REPLACE TEMP VIEW t2(t2a, t2b, t2c) AS VALUES (1, 1, 5), (2, 2, 7);
-
-SELECT t0a, (SELECT sum(c) FROM
-  (SELECT t1c as c
-  FROM   t1
-  WHERE  t1a = t0a
-  UNION ALL
-  SELECT t2c as c
-  FROM   t2
-  WHERE  t2b = t0b)
-)
-FROM t0;
-
-SELECT t0a, (SELECT sum(c) FROM
-  (SELECT t1c as c
-  FROM   t1
-  WHERE  t1a = t0a
-  UNION ALL
-  SELECT t2c as c
-  FROM   t2
-  WHERE  t2a = t0a)
-)
-FROM t0;
-
-SELECT t0a, (SELECT sum(c) FROM
-  (SELECT t1c as c
-  FROM   t1
-  WHERE  t1a > t0a
-  UNION ALL
-  SELECT t2c as c
-  FROM   t2
-  WHERE  t2b <= t0b)
-)
-FROM t0;
-
-SELECT t0a, (SELECT sum(t1c) FROM
-  (SELECT t1c
-  FROM   t1
-  WHERE  t1a = t0a
-  UNION ALL
-  SELECT t2c
-  FROM   t2
-  WHERE  t2b = t0b)
-)
-FROM t0;
-
-SELECT t0a, (SELECT sum(t1c) FROM
-  (SELECT t1c
-  FROM   t1
-  WHERE  t1a = t0a
-  UNION DISTINCT
-  SELECT t2c
-  FROM   t2
-  WHERE  t2b = t0b)
-)
-FROM t0;
-
--- Tests for column aliasing
-SELECT t0a, (SELECT sum(t1a + 3 * t1b + 5 * t1c) FROM
-  (SELECT t1c as t1a, t1a as t1b, t0a as t1c
-  FROM   t1
-  WHERE  t1a = t0a
-  UNION ALL
-  SELECT t0a as t2b, t2c as t1a, t0b as t2c
-  FROM   t2
-  WHERE  t2b = t0b)
-)
-FROM t0;
-
--- Test handling of COUNT bug
-SELECT t0a, (SELECT count(t1c) FROM
-  (SELECT t1c
-  FROM   t1
-  WHERE  t1a = t0a
-  UNION DISTINCT
-  SELECT t2c
-  FROM   t2
-  WHERE  t2b = t0b)
-)
-FROM t0;
-
--- Correlated references in project
-SELECT t0a, (SELECT sum(d) FROM
-  (SELECT t1a - t0a as d
-  FROM   t1
-  UNION ALL
-  SELECT t2a - t0a as d
-  FROM   t2)
-)
-FROM t0;
-
--- Correlated references in aggregate - unsupported
-SELECT t0a, (SELECT sum(d) FROM
-  (SELECT sum(t0a) as d
-  FROM   t1
-  UNION ALL
-  SELECT sum(t2a) + t0a as d
-  FROM   t2)
-)
-FROM t0;
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-set-op.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-set-op.sql
new file mode 100644
index 00000000000..8f03f7e4100
--- /dev/null
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-set-op.sql
@@ -0,0 +1,621 @@
+-- Set operations in correlation path
+
+CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0);
+CREATE OR REPLACE TEMP VIEW t1(t1a, t1b, t1c) AS VALUES (1, 1, 3);
+CREATE OR REPLACE TEMP VIEW t2(t2a, t2b, t2c) AS VALUES (1, 1, 5), (2, 2, 7);
+
+
+-- UNION ALL
+
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+SELECT * FROM t0 WHERE t0a <
+(SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+);
+
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2a = t0a)
+)
+FROM t0;
+
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a > t0a
+  UNION ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b <= t0b)
+)
+FROM t0;
+
+SELECT t0a, (SELECT sum(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION ALL
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+-- Tests for column aliasing
+SELECT t0a, (SELECT sum(t1a + 3 * t1b + 5 * t1c) FROM
+  (SELECT t1c as t1a, t1a as t1b, t0a as t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION ALL
+  SELECT t0a as t2b, t2c as t1a, t0b as t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+-- Test handling of COUNT bug
+SELECT t0a, (SELECT count(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION ALL
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+-- Correlated references in project
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT t1a - t0a as d
+  FROM   t1
+  UNION ALL
+  SELECT t2a - t0a as d
+  FROM   t2)
+)
+FROM t0;
+
+-- Correlated references in aggregate - unsupported
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT sum(t0a) as d
+  FROM   t1
+  UNION ALL
+  SELECT sum(t2a) + t0a as d
+  FROM   t2)
+)
+FROM t0;
+
+
+
+-- UNION DISTINCT
+
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+SELECT * FROM t0 WHERE t0a <
+(SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+);
+
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2a = t0a)
+)
+FROM t0;
+
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a > t0a
+  UNION DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b <= t0b)
+)
+FROM t0;
+
+SELECT t0a, (SELECT sum(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION DISTINCT
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+-- Tests for column aliasing
+SELECT t0a, (SELECT sum(t1a + 3 * t1b + 5 * t1c) FROM
+  (SELECT t1c as t1a, t1a as t1b, t0a as t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION DISTINCT
+  SELECT t0a as t2b, t2c as t1a, t0b as t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+-- Test handling of COUNT bug
+SELECT t0a, (SELECT count(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION DISTINCT
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+-- Correlated references in project
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT t1a - t0a as d
+  FROM   t1
+  UNION DISTINCT
+  SELECT t2a - t0a as d
+  FROM   t2)
+)
+FROM t0;
+
+-- Correlated references in aggregate - unsupported
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT sum(t0a) as d
+  FROM   t1
+  UNION DISTINCT
+  SELECT sum(t2a) + t0a as d
+  FROM   t2)
+)
+FROM t0;
+
+
+-- INTERSECT ALL
+
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+SELECT * FROM t0 WHERE t0a <
+(SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+);
+
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2a = t0a)
+)
+FROM t0;
+
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a > t0a
+  INTERSECT ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b <= t0b)
+)
+FROM t0;
+
+SELECT t0a, (SELECT sum(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT ALL
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+-- Tests for column aliasing
+SELECT t0a, (SELECT sum(t1a + 3 * t1b + 5 * t1c) FROM
+  (SELECT t1c as t1a, t1a as t1b, t0a as t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT ALL
+  SELECT t0a as t2b, t2c as t1a, t0b as t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+-- Test handling of COUNT bug
+SELECT t0a, (SELECT count(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT ALL
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+-- Correlated references in project
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT t1a - t0a as d
+  FROM   t1
+  INTERSECT ALL
+  SELECT t2a - t0a as d
+  FROM   t2)
+)
+FROM t0;
+
+-- Correlated references in aggregate - unsupported
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT sum(t0a) as d
+  FROM   t1
+  INTERSECT ALL
+  SELECT sum(t2a) + t0a as d
+  FROM   t2)
+)
+FROM t0;
+
+
+
+-- INTERSECT DISTINCT
+
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+SELECT * FROM t0 WHERE t0a <
+(SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+);
+
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2a = t0a)
+)
+FROM t0;
+
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a > t0a
+  INTERSECT DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b <= t0b)
+)
+FROM t0;
+
+SELECT t0a, (SELECT sum(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT DISTINCT
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+-- Tests for column aliasing
+SELECT t0a, (SELECT sum(t1a + 3 * t1b + 5 * t1c) FROM
+  (SELECT t1c as t1a, t1a as t1b, t0a as t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT DISTINCT
+  SELECT t0a as t2b, t2c as t1a, t0b as t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+-- Test handling of COUNT bug
+SELECT t0a, (SELECT count(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT DISTINCT
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+-- Correlated references in project
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT t1a - t0a as d
+  FROM   t1
+  INTERSECT DISTINCT
+  SELECT t2a - t0a as d
+  FROM   t2)
+)
+FROM t0;
+
+-- Correlated references in aggregate - unsupported
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT sum(t0a) as d
+  FROM   t1
+  INTERSECT DISTINCT
+  SELECT sum(t2a) + t0a as d
+  FROM   t2)
+)
+FROM t0;
+
+
+
+-- EXCEPT ALL
+
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+SELECT * FROM t0 WHERE t0a <
+(SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+);
+
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2a = t0a)
+)
+FROM t0;
+
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a > t0a
+  EXCEPT ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b <= t0b)
+)
+FROM t0;
+
+SELECT t0a, (SELECT sum(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT ALL
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+-- Tests for column aliasing
+SELECT t0a, (SELECT sum(t1a + 3 * t1b + 5 * t1c) FROM
+  (SELECT t1c as t1a, t1a as t1b, t0a as t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT ALL
+  SELECT t0a as t2b, t2c as t1a, t0b as t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+-- Test handling of COUNT bug
+SELECT t0a, (SELECT count(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT ALL
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+-- Correlated references in project
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT t1a - t0a as d
+  FROM   t1
+  EXCEPT ALL
+  SELECT t2a - t0a as d
+  FROM   t2)
+)
+FROM t0;
+
+-- Correlated references in aggregate - unsupported
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT sum(t0a) as d
+  FROM   t1
+  EXCEPT ALL
+  SELECT sum(t2a) + t0a as d
+  FROM   t2)
+)
+FROM t0;
+
+
+
+-- EXCEPT DISTINCT
+
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+SELECT * FROM t0 WHERE t0a <
+(SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+);
+
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2a = t0a)
+)
+FROM t0;
+
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a > t0a
+  EXCEPT DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b <= t0b)
+)
+FROM t0;
+
+SELECT t0a, (SELECT sum(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT DISTINCT
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+-- Tests for column aliasing
+SELECT t0a, (SELECT sum(t1a + 3 * t1b + 5 * t1c) FROM
+  (SELECT t1c as t1a, t1a as t1b, t0a as t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT DISTINCT
+  SELECT t0a as t2b, t2c as t1a, t0b as t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+-- Test handling of COUNT bug
+SELECT t0a, (SELECT count(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT DISTINCT
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0;
+
+-- Correlated references in project
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT t1a - t0a as d
+  FROM   t1
+  EXCEPT DISTINCT
+  SELECT t2a - t0a as d
+  FROM   t2)
+)
+FROM t0;
+
+-- Correlated references in aggregate - unsupported
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT sum(t0a) as d
+  FROM   t1
+  EXCEPT DISTINCT
+  SELECT sum(t2a) + t0a as d
+  FROM   t2)
+)
+FROM t0;
diff --git a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out 
b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
index 9431714fecf..0324320a0e5 100644
--- a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
@@ -913,6 +913,67 @@ struct<c1:int,c2:int,c2:int>
 0      1       3
 
 
+-- !query
+SELECT * FROM t1 JOIN LATERAL
+  (SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1
+  INTERSECT ALL
+  SELECT t4.c2
+  FROM   t4
+  WHERE  t4.c1 = t1.c1)
+-- !query schema
+struct<c1:int,c2:int,c2:int>
+-- !query output
+0      1       2
+
+
+-- !query
+SELECT * FROM t1 JOIN LATERAL
+  (SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1
+  INTERSECT DISTINCT
+  SELECT t4.c2
+  FROM   t4
+  WHERE  t4.c1 > t1.c2)
+-- !query schema
+struct<c1:int,c2:int,c2:int>
+-- !query output
+
+
+
+-- !query
+SELECT * FROM t1 JOIN LATERAL
+  (SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1
+  EXCEPT ALL
+  SELECT t4.c2
+  FROM   t4
+  WHERE  t4.c1 = t1.c1)
+-- !query schema
+struct<c1:int,c2:int,c2:int>
+-- !query output
+0      1       3
+
+
+-- !query
+SELECT * FROM t1 JOIN LATERAL
+  (SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1
+  EXCEPT DISTINCT
+  SELECT t4.c2
+  FROM   t4
+  WHERE  t4.c1 > t1.c2)
+-- !query schema
+struct<c1:int,c2:int,c2:int>
+-- !query output
+0      1       2
+0      1       3
+
+
 -- !query
 SELECT * FROM t1 JOIN LATERAL
   (SELECT COUNT(t2.c2)
@@ -996,6 +1057,83 @@ struct<c1:int,c2:int,c2:int>
 1      2       3
 
 
+-- !query
+SELECT * FROM t1 JOIN LATERAL
+  (SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1 and t2.c2 >= t1.c2
+  UNION DISTINCT
+  SELECT t4.c2
+  FROM   t4)
+-- !query schema
+struct<c1:int,c2:int,c2:int>
+-- !query output
+0      1       1
+0      1       2
+0      1       3
+1      2       1
+1      2       2
+1      2       3
+
+
+-- !query
+SELECT * FROM t1 JOIN LATERAL
+  (SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1 and t2.c2 >= t1.c2
+  INTERSECT ALL
+  SELECT t4.c2
+  FROM   t4)
+-- !query schema
+struct<c1:int,c2:int,c2:int>
+-- !query output
+0      1       2
+0      1       3
+
+
+-- !query
+SELECT * FROM t1 JOIN LATERAL
+  (SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1 and t2.c2 >= t1.c2
+  INTERSECT DISTINCT
+  SELECT t4.c2
+  FROM   t4)
+-- !query schema
+struct<c1:int,c2:int,c2:int>
+-- !query output
+0      1       2
+0      1       3
+
+
+-- !query
+SELECT * FROM t1 JOIN LATERAL
+  (SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1 and t2.c2 >= t1.c2
+  EXCEPT ALL
+  SELECT t4.c2
+  FROM   t4)
+-- !query schema
+struct<c1:int,c2:int,c2:int>
+-- !query output
+
+
+
+-- !query
+SELECT * FROM t1 JOIN LATERAL
+  (SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1 and t2.c2 >= t1.c2
+  EXCEPT DISTINCT
+  SELECT t4.c2
+  FROM   t4)
+-- !query schema
+struct<c1:int,c2:int,c2:int>
+-- !query output
+
+
+
 -- !query
 SELECT * FROM t1 JOIN LATERAL
   (SELECT t2.c2
@@ -1073,9 +1211,9 @@ org.apache.spark.sql.AnalysisException
 
 
 -- !query
-(SELECT * FROM t1 JOIN LATERAL (SELECT * FROM t2 WHERE t2.c1 = t1.c1))
+SELECT * FROM t1 JOIN LATERAL (SELECT * FROM t2 WHERE t2.c1 = t1.c1)
 UNION ALL
-(SELECT * FROM t1 JOIN t4)
+SELECT * FROM t1 JOIN t4
 -- !query schema
 struct<c1:int,c2:int,c1:int,c2:int>
 -- !query output
@@ -1092,23 +1230,23 @@ struct<c1:int,c2:int,c1:int,c2:int>
 
 
 -- !query
-(SELECT * FROM t1 JOIN LATERAL
+SELECT * FROM t1 JOIN LATERAL
   (SELECT t2.c2
   FROM   t2
   WHERE  t2.c1 = t1.c1
   UNION ALL
   SELECT t4.c2
   FROM   t4
-  WHERE  t4.c1 = t1.c1))
+  WHERE  t4.c1 = t1.c1)
 UNION ALL
-(SELECT * FROM t2 JOIN LATERAL
+SELECT * FROM t2 JOIN LATERAL
   (SELECT t1.c2
   FROM   t1
   WHERE  t2.c1 <= t1.c1
   UNION ALL
   SELECT t4.c2
   FROM   t4
-  WHERE  t4.c1 < t2.c1))
+  WHERE  t4.c1 < t2.c1)
 -- !query schema
 struct<c1:int,c2:int,c2:int>
 -- !query output
@@ -1124,6 +1262,105 @@ struct<c1:int,c2:int,c2:int>
 1      2       3
 
 
+-- !query
+SELECT * FROM t1 JOIN LATERAL
+  ((SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1
+  EXCEPT DISTINCT
+  SELECT t4.c2
+  FROM   t4
+  WHERE  t4.c1 > t1.c2)
+  UNION DISTINCT
+  (SELECT t4.c1
+  FROM   t4
+  WHERE  t4.c1 <= t1.c2
+  INTERSECT ALL
+  SELECT t4.c2
+  FROM   t4
+  WHERE  t4.c1 <> t1.c1)
+)
+-- !query schema
+struct<c1:int,c2:int,c2:int>
+-- !query output
+0      1       1
+0      1       2
+0      1       3
+1      2       1
+
+
+-- !query
+SELECT * FROM t1 JOIN LATERAL
+  ((SELECT t2.c2
+  FROM   t2
+  WHERE  t2.c1 = t1.c1
+  UNION ALL
+  SELECT t4.c2
+  FROM   t4
+  WHERE  t4.c1 > t1.c2)
+  INTERSECT DISTINCT
+  (SELECT t4.c1
+  FROM   t4
+  WHERE  t4.c1 <= t1.c2
+  EXCEPT ALL
+  SELECT t4.c2
+  FROM   t4
+  WHERE  t4.c1 <> t1.c1)
+)
+-- !query schema
+struct<c1:int,c2:int,c2:int>
+-- !query output
+
+
+
+-- !query
+SELECT * FROM t1 JOIN LATERAL (SELECT sum(c1) FROM
+  (SELECT *
+  FROM   t2
+  WHERE  t2.c1 <= t1.c1) lhs
+  LEFT SEMI JOIN
+  (SELECT *
+  FROM   t4) rhs
+  ON lhs.c1 <=> rhs.c1 and lhs.c2 <=> rhs.c2
+)
+-- !query schema
+struct<c1:int,c2:int,sum(c1):bigint>
+-- !query output
+0      1       0
+1      2       0
+
+
+-- !query
+SELECT * FROM t1 JOIN LATERAL (SELECT sum(c1) FROM
+  (SELECT *
+  FROM   t2
+  WHERE  t2.c1 <= t1.c1) lhs
+  LEFT SEMI JOIN
+  (SELECT *
+  FROM   t4
+  WHERE t4.c1 > t1.c2) rhs
+  ON lhs.c1 <=> rhs.c1 and lhs.c2 <=> rhs.c2
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter (c1#x > outer(c2#x))\n+- SubqueryAlias 
spark_catalog.default.t4\n   +- View (`spark_catalog`.`default`.`t4`, 
[c1#x,c2#x])\n      +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as 
int) AS c2#x]\n         +- LocalRelation [col1#x, col2#x]\n"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 125,
+    "stopIndex" : 166,
+    "fragment" : "SELECT *\n  FROM   t4\n  WHERE t4.c1 > t1.c2"
+  } ]
+}
+
+
 -- !query
 SELECT * FROM LATERAL EXPLODE(ARRAY(1, 2))
 -- !query schema
diff --git 
a/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-joins-and-set-ops.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-joins-and-set-ops.sql.out
index 8886872bfb4..62f7874a442 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-joins-and-set-ops.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-joins-and-set-ops.sql.out
@@ -361,14 +361,14 @@ 
struct<id:int,emp_name:string,hiredate:date,salary:double,dept_id:int>
 
 
 -- !query
-SELECT * 
-FROM   emp 
-WHERE  EXISTS (SELECT * 
-               FROM   dept 
+SELECT *
+FROM   emp
+WHERE  EXISTS (SELECT *
+               FROM   dept
                WHERE  dept_id = emp.dept_id and state = "CA"
-               UNION 
-               SELECT * 
-               FROM   dept 
+               UNION
+               SELECT *
+               FROM   dept
                WHERE  dept_id = emp.dept_id and state = "TX")
 -- !query schema
 struct<>
@@ -383,20 +383,200 @@ org.apache.spark.sql.AnalysisException
   "queryContext" : [ {
     "objectType" : "",
     "objectName" : "",
-    "startIndex" : 38,
-    "stopIndex" : 135,
-    "fragment" : "SELECT * \n               FROM   dept \n               WHERE 
 dept_id = emp.dept_id and state = \"CA\""
+    "startIndex" : 36,
+    "stopIndex" : 131,
+    "fragment" : "SELECT *\n               FROM   dept\n               WHERE  
dept_id = emp.dept_id and state = \"CA\""
   } ]
 }
 
 
 -- !query
-SELECT * 
-FROM   emp 
-WHERE NOT EXISTS (SELECT * 
-               FROM   dept 
+SELECT *
+FROM   emp
+WHERE NOT EXISTS (SELECT *
+               FROM   dept
                WHERE  dept_id = emp.dept_id and state = "CA"
-               UNION 
+               UNION
+               SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "TX")
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter ((dept_id#x = outer(dept_id#x)) AND (state#x = 
CA))\n+- SubqueryAlias dept\n   +- View (`DEPT`, 
[dept_id#x,dept_name#x,state#x])\n      +- Project [cast(dept_id#x as int) AS 
dept_id#x, cast(dept_name#x as string) AS dept_name#x, cast(state#x as string) 
AS state#x]\n         +- Project [dept_id#x, dept_name#x, state#x]\n            
+- SubqueryAlias DEPT\n               +- LocalRelation [dept_id#x, dept_name#x, 
state#x]\n"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 39,
+    "stopIndex" : 134,
+    "fragment" : "SELECT *\n               FROM   dept\n               WHERE  
dept_id = emp.dept_id and state = \"CA\""
+  } ]
+}
+
+
+-- !query
+SELECT *
+FROM   emp
+WHERE  EXISTS (SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "CA"
+               INTERSECT ALL
+               SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "TX")
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter ((dept_id#x = outer(dept_id#x)) AND (state#x = 
CA))\n+- SubqueryAlias dept\n   +- View (`DEPT`, 
[dept_id#x,dept_name#x,state#x])\n      +- Project [cast(dept_id#x as int) AS 
dept_id#x, cast(dept_name#x as string) AS dept_name#x, cast(state#x as string) 
AS state#x]\n         +- Project [dept_id#x, dept_name#x, state#x]\n            
+- SubqueryAlias DEPT\n               +- LocalRelation [dept_id#x, dept_name#x, 
state#x]\n"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 36,
+    "stopIndex" : 131,
+    "fragment" : "SELECT *\n               FROM   dept\n               WHERE  
dept_id = emp.dept_id and state = \"CA\""
+  } ]
+}
+
+
+-- !query
+SELECT *
+FROM   emp
+WHERE EXISTS (SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "CA"
+               INTERSECT DISTINCT
+               SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "TX")
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter ((dept_id#x = outer(dept_id#x)) AND (state#x = 
CA))\n+- SubqueryAlias dept\n   +- View (`DEPT`, 
[dept_id#x,dept_name#x,state#x])\n      +- Project [cast(dept_id#x as int) AS 
dept_id#x, cast(dept_name#x as string) AS dept_name#x, cast(state#x as string) 
AS state#x]\n         +- Project [dept_id#x, dept_name#x, state#x]\n            
+- SubqueryAlias DEPT\n               +- LocalRelation [dept_id#x, dept_name#x, 
state#x]\n"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 35,
+    "stopIndex" : 130,
+    "fragment" : "SELECT *\n               FROM   dept\n               WHERE  
dept_id = emp.dept_id and state = \"CA\""
+  } ]
+}
+
+
+-- !query
+SELECT *
+FROM   emp
+WHERE  EXISTS (SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "CA"
+               EXCEPT ALL
+               SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "TX")
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter ((dept_id#x = outer(dept_id#x)) AND (state#x = 
CA))\n+- SubqueryAlias dept\n   +- View (`DEPT`, 
[dept_id#x,dept_name#x,state#x])\n      +- Project [cast(dept_id#x as int) AS 
dept_id#x, cast(dept_name#x as string) AS dept_name#x, cast(state#x as string) 
AS state#x]\n         +- Project [dept_id#x, dept_name#x, state#x]\n            
+- SubqueryAlias DEPT\n               +- LocalRelation [dept_id#x, dept_name#x, 
state#x]\n"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 36,
+    "stopIndex" : 131,
+    "fragment" : "SELECT *\n               FROM   dept\n               WHERE  
dept_id = emp.dept_id and state = \"CA\""
+  } ]
+}
+
+
+-- !query
+SELECT *
+FROM   emp
+WHERE  EXISTS (SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "CA"
+               EXCEPT DISTINCT
+               SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "TX")
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter ((dept_id#x = outer(dept_id#x)) AND (state#x = 
CA))\n+- SubqueryAlias dept\n   +- View (`DEPT`, 
[dept_id#x,dept_name#x,state#x])\n      +- Project [cast(dept_id#x as int) AS 
dept_id#x, cast(dept_name#x as string) AS dept_name#x, cast(state#x as string) 
AS state#x]\n         +- Project [dept_id#x, dept_name#x, state#x]\n            
+- SubqueryAlias DEPT\n               +- LocalRelation [dept_id#x, dept_name#x, 
state#x]\n"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 36,
+    "stopIndex" : 131,
+    "fragment" : "SELECT *\n               FROM   dept\n               WHERE  
dept_id = emp.dept_id and state = \"CA\""
+  } ]
+}
+
+
+-- !query
+SELECT *
+FROM   emp
+WHERE NOT EXISTS (SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "CA"
+               INTERSECT ALL
+               SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "TX")
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter ((dept_id#x = outer(dept_id#x)) AND (state#x = 
CA))\n+- SubqueryAlias dept\n   +- View (`DEPT`, 
[dept_id#x,dept_name#x,state#x])\n      +- Project [cast(dept_id#x as int) AS 
dept_id#x, cast(dept_name#x as string) AS dept_name#x, cast(state#x as string) 
AS state#x]\n         +- Project [dept_id#x, dept_name#x, state#x]\n            
+- SubqueryAlias DEPT\n               +- LocalRelation [dept_id#x, dept_name#x, 
state#x]\n"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 39,
+    "stopIndex" : 134,
+    "fragment" : "SELECT *\n               FROM   dept\n               WHERE  
dept_id = emp.dept_id and state = \"CA\""
+  } ]
+}
+
+
+-- !query
+SELECT *
+FROM   emp
+WHERE NOT EXISTS (SELECT *
+               FROM   dept
+               WHERE  dept_id = emp.dept_id and state = "CA"
+               EXCEPT DISTINCT
                SELECT * 
                FROM   dept 
                WHERE  dept_id = emp.dept_id and state = "TX")
@@ -413,8 +593,8 @@ org.apache.spark.sql.AnalysisException
   "queryContext" : [ {
     "objectType" : "",
     "objectName" : "",
-    "startIndex" : 41,
-    "stopIndex" : 138,
-    "fragment" : "SELECT * \n               FROM   dept \n               WHERE 
 dept_id = emp.dept_id and state = \"CA\""
+    "startIndex" : 39,
+    "stopIndex" : 134,
+    "fragment" : "SELECT *\n               FROM   dept\n               WHERE  
dept_id = emp.dept_id and state = \"CA\""
   } ]
 }
diff --git 
a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-set-operations.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-set-operations.sql.out
index 5c961c10e0f..8ac2ffdf96e 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-set-operations.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-set-operations.sql.out
@@ -590,3 +590,351 @@ struct<count(DISTINCT 
t1a):bigint,t1b:smallint,t1c:int,t1i:date>
 1      10      NULL    2014-08-04
 1      10      NULL    2014-09-04
 1      10      NULL    2015-05-04
+
+
+-- !query
+SELECT *
+FROM   t1
+WHERE  t1a IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               UNION ALL
+               SELECT t3a
+               FROM   t3)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter (t2b#x = outer(t1b#x))\n+- SubqueryAlias t2\n   +- 
View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])\n      +- 
Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, 
cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as 
float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS 
t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]\n       
  +- Project [t2a#x, t2b [...]
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 35,
+    "stopIndex" : 100,
+    "fragment" : "SELECT t2a\n               FROM   t2\n               WHERE 
t2b = t1b"
+  } ]
+}
+
+
+-- !query
+SELECT *
+FROM   t1
+WHERE  t1a IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               UNION DISTINCT
+               SELECT t3a
+               FROM   t3)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter (t2b#x = outer(t1b#x))\n+- SubqueryAlias t2\n   +- 
View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])\n      +- 
Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, 
cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as 
float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS 
t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]\n       
  +- Project [t2a#x, t2b [...]
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 35,
+    "stopIndex" : 100,
+    "fragment" : "SELECT t2a\n               FROM   t2\n               WHERE 
t2b = t1b"
+  } ]
+}
+
+
+-- !query
+SELECT *
+FROM   t1
+WHERE  t1a IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               INTERSECT ALL
+               SELECT t3a
+               FROM   t3)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter (t2b#x = outer(t1b#x))\n+- SubqueryAlias t2\n   +- 
View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])\n      +- 
Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, 
cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as 
float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS 
t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]\n       
  +- Project [t2a#x, t2b [...]
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 35,
+    "stopIndex" : 100,
+    "fragment" : "SELECT t2a\n               FROM   t2\n               WHERE 
t2b = t1b"
+  } ]
+}
+
+
+-- !query
+SELECT *
+FROM   t1
+WHERE  t1a IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               INTERSECT DISTINCT
+               SELECT t3a
+               FROM   t3)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter (t2b#x = outer(t1b#x))\n+- SubqueryAlias t2\n   +- 
View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])\n      +- 
Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, 
cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as 
float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS 
t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]\n       
  +- Project [t2a#x, t2b [...]
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 35,
+    "stopIndex" : 100,
+    "fragment" : "SELECT t2a\n               FROM   t2\n               WHERE 
t2b = t1b"
+  } ]
+}
+
+
+-- !query
+SELECT *
+FROM   t1
+WHERE  t1a IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               EXCEPT ALL
+               SELECT t3a
+               FROM   t3)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter (t2b#x = outer(t1b#x))\n+- SubqueryAlias t2\n   +- 
View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])\n      +- 
Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, 
cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as 
float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS 
t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]\n       
  +- Project [t2a#x, t2b [...]
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 35,
+    "stopIndex" : 100,
+    "fragment" : "SELECT t2a\n               FROM   t2\n               WHERE 
t2b = t1b"
+  } ]
+}
+
+
+-- !query
+SELECT *
+FROM   t1
+WHERE  t1a IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               EXCEPT DISTINCT
+               SELECT t3a
+               FROM   t3)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter (t2b#x = outer(t1b#x))\n+- SubqueryAlias t2\n   +- 
View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])\n      +- 
Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, 
cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as 
float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS 
t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]\n       
  +- Project [t2a#x, t2b [...]
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 35,
+    "stopIndex" : 100,
+    "fragment" : "SELECT t2a\n               FROM   t2\n               WHERE 
t2b = t1b"
+  } ]
+}
+
+
+-- !query
+SELECT *
+FROM   t1
+WHERE  t1a NOT IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               UNION ALL
+               SELECT t3a
+               FROM   t3)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter (t2b#x = outer(t1b#x))\n+- SubqueryAlias t2\n   +- 
View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])\n      +- 
Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, 
cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as 
float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS 
t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]\n       
  +- Project [t2a#x, t2b [...]
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 39,
+    "stopIndex" : 104,
+    "fragment" : "SELECT t2a\n               FROM   t2\n               WHERE 
t2b = t1b"
+  } ]
+}
+
+
+-- !query
+SELECT *
+FROM   t1
+WHERE  t1a NOT IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               UNION DISTINCT
+               SELECT t3a
+               FROM   t3)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter (t2b#x = outer(t1b#x))\n+- SubqueryAlias t2\n   +- 
View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])\n      +- 
Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, 
cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as 
float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS 
t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]\n       
  +- Project [t2a#x, t2b [...]
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 39,
+    "stopIndex" : 104,
+    "fragment" : "SELECT t2a\n               FROM   t2\n               WHERE 
t2b = t1b"
+  } ]
+}
+
+
+-- !query
+SELECT *
+FROM   t1
+WHERE  t1a NOT IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               INTERSECT ALL
+               SELECT t3a
+               FROM   t3)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter (t2b#x = outer(t1b#x))\n+- SubqueryAlias t2\n   +- 
View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])\n      +- 
Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, 
cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as 
float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS 
t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]\n       
  +- Project [t2a#x, t2b [...]
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 39,
+    "stopIndex" : 104,
+    "fragment" : "SELECT t2a\n               FROM   t2\n               WHERE 
t2b = t1b"
+  } ]
+}
+
+
+-- !query
+SELECT *
+FROM   t1
+WHERE  t1a NOT IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               INTERSECT DISTINCT
+               SELECT t3a
+               FROM   t3)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter (t2b#x = outer(t1b#x))\n+- SubqueryAlias t2\n   +- 
View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])\n      +- 
Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, 
cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as 
float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS 
t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]\n       
  +- Project [t2a#x, t2b [...]
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 39,
+    "stopIndex" : 104,
+    "fragment" : "SELECT t2a\n               FROM   t2\n               WHERE 
t2b = t1b"
+  } ]
+}
+
+
+-- !query
+SELECT *
+FROM   t1
+WHERE  t1a NOT IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               EXCEPT ALL
+               SELECT t3a
+               FROM   t3)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter (t2b#x = outer(t1b#x))\n+- SubqueryAlias t2\n   +- 
View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])\n      +- 
Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, 
cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as 
float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS 
t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]\n       
  +- Project [t2a#x, t2b [...]
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 39,
+    "stopIndex" : 104,
+    "fragment" : "SELECT t2a\n               FROM   t2\n               WHERE 
t2b = t1b"
+  } ]
+}
+
+
+-- !query
+SELECT *
+FROM   t1
+WHERE  t1a NOT IN (SELECT t2a
+               FROM   t2
+               WHERE t2b = t1b
+               EXCEPT DISTINCT
+               SELECT t3a
+               FROM   t3)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "Filter (t2b#x = outer(t1b#x))\n+- SubqueryAlias t2\n   +- 
View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])\n      +- 
Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, 
cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as 
float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS 
t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]\n       
  +- Project [t2a#x, t2b [...]
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 39,
+    "stopIndex" : 104,
+    "fragment" : "SELECT t2a\n               FROM   t2\n               WHERE 
t2b = t1b"
+  } ]
+}
diff --git 
a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-select.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-select.sql.out
index 016017ca770..0588d0a9053 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-select.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-select.sql.out
@@ -541,198 +541,3 @@ SELECT c, (
 struct<c:int,scalarsubquery(c):bigint>
 -- !query output
 6      4
-
-
--- !query
-CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0)
--- !query schema
-struct<>
--- !query output
-
-
-
--- !query
-CREATE OR REPLACE TEMP VIEW t1(t1a, t1b, t1c) AS VALUES (1, 1, 3)
--- !query schema
-struct<>
--- !query output
-
-
-
--- !query
-CREATE OR REPLACE TEMP VIEW t2(t2a, t2b, t2c) AS VALUES (1, 1, 5), (2, 2, 7)
--- !query schema
-struct<>
--- !query output
-
-
-
--- !query
-SELECT t0a, (SELECT sum(c) FROM
-  (SELECT t1c as c
-  FROM   t1
-  WHERE  t1a = t0a
-  UNION ALL
-  SELECT t2c as c
-  FROM   t2
-  WHERE  t2b = t0b)
-)
-FROM t0
--- !query schema
-struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
--- !query output
-1      8
-2      NULL
-
-
--- !query
-SELECT t0a, (SELECT sum(c) FROM
-  (SELECT t1c as c
-  FROM   t1
-  WHERE  t1a = t0a
-  UNION ALL
-  SELECT t2c as c
-  FROM   t2
-  WHERE  t2a = t0a)
-)
-FROM t0
--- !query schema
-struct<t0a:int,scalarsubquery(t0a, t0a):bigint>
--- !query output
-1      8
-2      7
-
-
--- !query
-SELECT t0a, (SELECT sum(c) FROM
-  (SELECT t1c as c
-  FROM   t1
-  WHERE  t1a > t0a
-  UNION ALL
-  SELECT t2c as c
-  FROM   t2
-  WHERE  t2b <= t0b)
-)
-FROM t0
--- !query schema
-struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
--- !query output
-1      5
-2      NULL
-
-
--- !query
-SELECT t0a, (SELECT sum(t1c) FROM
-  (SELECT t1c
-  FROM   t1
-  WHERE  t1a = t0a
-  UNION ALL
-  SELECT t2c
-  FROM   t2
-  WHERE  t2b = t0b)
-)
-FROM t0
--- !query schema
-struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
--- !query output
-1      8
-2      NULL
-
-
--- !query
-SELECT t0a, (SELECT sum(t1c) FROM
-  (SELECT t1c
-  FROM   t1
-  WHERE  t1a = t0a
-  UNION DISTINCT
-  SELECT t2c
-  FROM   t2
-  WHERE  t2b = t0b)
-)
-FROM t0
--- !query schema
-struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
--- !query output
-1      8
-2      NULL
-
-
--- !query
-SELECT t0a, (SELECT sum(t1a + 3 * t1b + 5 * t1c) FROM
-  (SELECT t1c as t1a, t1a as t1b, t0a as t1c
-  FROM   t1
-  WHERE  t1a = t0a
-  UNION ALL
-  SELECT t0a as t2b, t2c as t1a, t0b as t2c
-  FROM   t2
-  WHERE  t2b = t0b)
-)
-FROM t0
--- !query schema
-struct<t0a:int,scalarsubquery(t0a, t0a, t0a, t0b, t0b):bigint>
--- !query output
-1      32
-2      NULL
-
-
--- !query
-SELECT t0a, (SELECT count(t1c) FROM
-  (SELECT t1c
-  FROM   t1
-  WHERE  t1a = t0a
-  UNION DISTINCT
-  SELECT t2c
-  FROM   t2
-  WHERE  t2b = t0b)
-)
-FROM t0
--- !query schema
-struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
--- !query output
-1      2
-2      0
-
-
--- !query
-SELECT t0a, (SELECT sum(d) FROM
-  (SELECT t1a - t0a as d
-  FROM   t1
-  UNION ALL
-  SELECT t2a - t0a as d
-  FROM   t2)
-)
-FROM t0
--- !query schema
-struct<t0a:int,scalarsubquery(t0a, t0a):bigint>
--- !query output
-1      1
-2      -2
-
-
--- !query
-SELECT t0a, (SELECT sum(d) FROM
-  (SELECT sum(t0a) as d
-  FROM   t1
-  UNION ALL
-  SELECT sum(t2a) + t0a as d
-  FROM   t2)
-)
-FROM t0
--- !query schema
-struct<>
--- !query output
-org.apache.spark.sql.AnalysisException
-{
-  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_REFERENCE",
-  "sqlState" : "0A000",
-  "messageParameters" : {
-    "sqlExprs" : "\"sum(t0a) AS d\""
-  },
-  "queryContext" : [ {
-    "objectType" : "",
-    "objectName" : "",
-    "startIndex" : 36,
-    "stopIndex" : 67,
-    "fragment" : "SELECT sum(t0a) as d\n  FROM   t1"
-  } ]
-}
diff --git 
a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out
new file mode 100644
index 00000000000..d21873cd3c8
--- /dev/null
+++ 
b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out
@@ -0,0 +1,1043 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE OR REPLACE TEMP VIEW t1(t1a, t1b, t1c) AS VALUES (1, 1, 3)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE OR REPLACE TEMP VIEW t2(t2a, t2b, t2c) AS VALUES (1, 1, 5), (2, 2, 7)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      8
+2      NULL
+
+
+-- !query
+SELECT * FROM t0 WHERE t0a <
+(SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+-- !query schema
+struct<t0a:int,t0b:int>
+-- !query output
+1      1
+
+
+-- !query
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2a = t0a)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0a):bigint>
+-- !query output
+1      8
+2      7
+
+
+-- !query
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a > t0a
+  UNION ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b <= t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      5
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT sum(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION ALL
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      8
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT sum(t1a + 3 * t1b + 5 * t1c) FROM
+  (SELECT t1c as t1a, t1a as t1b, t0a as t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION ALL
+  SELECT t0a as t2b, t2c as t1a, t0b as t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0a, t0a, t0b, t0b):bigint>
+-- !query output
+1      32
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT count(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION ALL
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      2
+2      0
+
+
+-- !query
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT t1a - t0a as d
+  FROM   t1
+  UNION ALL
+  SELECT t2a - t0a as d
+  FROM   t2)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0a):bigint>
+-- !query output
+1      1
+2      -2
+
+
+-- !query
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT sum(t0a) as d
+  FROM   t1
+  UNION ALL
+  SELECT sum(t2a) + t0a as d
+  FROM   t2)
+)
+FROM t0
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_REFERENCE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "sqlExprs" : "\"sum(t0a) AS d\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 36,
+    "stopIndex" : 67,
+    "fragment" : "SELECT sum(t0a) as d\n  FROM   t1"
+  } ]
+}
+
+
+-- !query
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      8
+2      NULL
+
+
+-- !query
+SELECT * FROM t0 WHERE t0a <
+(SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+-- !query schema
+struct<t0a:int,t0b:int>
+-- !query output
+1      1
+
+
+-- !query
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2a = t0a)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0a):bigint>
+-- !query output
+1      8
+2      7
+
+
+-- !query
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a > t0a
+  UNION DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b <= t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      5
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT sum(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION DISTINCT
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      8
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT sum(t1a + 3 * t1b + 5 * t1c) FROM
+  (SELECT t1c as t1a, t1a as t1b, t0a as t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION DISTINCT
+  SELECT t0a as t2b, t2c as t1a, t0b as t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0a, t0a, t0b, t0b):bigint>
+-- !query output
+1      32
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT count(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  UNION DISTINCT
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      2
+2      0
+
+
+-- !query
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT t1a - t0a as d
+  FROM   t1
+  UNION DISTINCT
+  SELECT t2a - t0a as d
+  FROM   t2)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0a):bigint>
+-- !query output
+1      1
+2      -1
+
+
+-- !query
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT sum(t0a) as d
+  FROM   t1
+  UNION DISTINCT
+  SELECT sum(t2a) + t0a as d
+  FROM   t2)
+)
+FROM t0
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_REFERENCE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "sqlExprs" : "\"sum(t0a) AS d\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 36,
+    "stopIndex" : 67,
+    "fragment" : "SELECT sum(t0a) as d\n  FROM   t1"
+  } ]
+}
+
+
+-- !query
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      NULL
+2      NULL
+
+
+-- !query
+SELECT * FROM t0 WHERE t0a <
+(SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+-- !query schema
+struct<t0a:int,t0b:int>
+-- !query output
+
+
+
+-- !query
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2a = t0a)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0a):bigint>
+-- !query output
+1      NULL
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a > t0a
+  INTERSECT ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b <= t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      NULL
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT sum(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT ALL
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      NULL
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT sum(t1a + 3 * t1b + 5 * t1c) FROM
+  (SELECT t1c as t1a, t1a as t1b, t0a as t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT ALL
+  SELECT t0a as t2b, t2c as t1a, t0b as t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0a, t0a, t0b, t0b):bigint>
+-- !query output
+1      NULL
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT count(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT ALL
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      0
+2      0
+
+
+-- !query
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT t1a - t0a as d
+  FROM   t1
+  INTERSECT ALL
+  SELECT t2a - t0a as d
+  FROM   t2)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0a):bigint>
+-- !query output
+1      0
+2      -1
+
+
+-- !query
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT sum(t0a) as d
+  FROM   t1
+  INTERSECT ALL
+  SELECT sum(t2a) + t0a as d
+  FROM   t2)
+)
+FROM t0
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_REFERENCE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "sqlExprs" : "\"sum(t0a) AS d\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 36,
+    "stopIndex" : 67,
+    "fragment" : "SELECT sum(t0a) as d\n  FROM   t1"
+  } ]
+}
+
+
+-- !query
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      NULL
+2      NULL
+
+
+-- !query
+SELECT * FROM t0 WHERE t0a <
+(SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+-- !query schema
+struct<t0a:int,t0b:int>
+-- !query output
+
+
+
+-- !query
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2a = t0a)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0a):bigint>
+-- !query output
+1      NULL
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a > t0a
+  INTERSECT DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b <= t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      NULL
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT sum(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT DISTINCT
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      NULL
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT sum(t1a + 3 * t1b + 5 * t1c) FROM
+  (SELECT t1c as t1a, t1a as t1b, t0a as t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT DISTINCT
+  SELECT t0a as t2b, t2c as t1a, t0b as t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0a, t0a, t0b, t0b):bigint>
+-- !query output
+1      NULL
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT count(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  INTERSECT DISTINCT
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      0
+2      0
+
+
+-- !query
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT t1a - t0a as d
+  FROM   t1
+  INTERSECT DISTINCT
+  SELECT t2a - t0a as d
+  FROM   t2)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0a):bigint>
+-- !query output
+1      0
+2      -1
+
+
+-- !query
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT sum(t0a) as d
+  FROM   t1
+  INTERSECT DISTINCT
+  SELECT sum(t2a) + t0a as d
+  FROM   t2)
+)
+FROM t0
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_REFERENCE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "sqlExprs" : "\"sum(t0a) AS d\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 36,
+    "stopIndex" : 67,
+    "fragment" : "SELECT sum(t0a) as d\n  FROM   t1"
+  } ]
+}
+
+
+-- !query
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      3
+2      NULL
+
+
+-- !query
+SELECT * FROM t0 WHERE t0a <
+(SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+-- !query schema
+struct<t0a:int,t0b:int>
+-- !query output
+1      1
+
+
+-- !query
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2a = t0a)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0a):bigint>
+-- !query output
+1      3
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a > t0a
+  EXCEPT ALL
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b <= t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      NULL
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT sum(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT ALL
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      3
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT sum(t1a + 3 * t1b + 5 * t1c) FROM
+  (SELECT t1c as t1a, t1a as t1b, t0a as t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT ALL
+  SELECT t0a as t2b, t2c as t1a, t0b as t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0a, t0a, t0b, t0b):bigint>
+-- !query output
+1      11
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT count(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT ALL
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      1
+2      0
+
+
+-- !query
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT t1a - t0a as d
+  FROM   t1
+  EXCEPT ALL
+  SELECT t2a - t0a as d
+  FROM   t2)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0a):bigint>
+-- !query output
+1      NULL
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT sum(t0a) as d
+  FROM   t1
+  EXCEPT ALL
+  SELECT sum(t2a) + t0a as d
+  FROM   t2)
+)
+FROM t0
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_REFERENCE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "sqlExprs" : "\"sum(t0a) AS d\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 36,
+    "stopIndex" : 67,
+    "fragment" : "SELECT sum(t0a) as d\n  FROM   t1"
+  } ]
+}
+
+
+-- !query
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      3
+2      NULL
+
+
+-- !query
+SELECT * FROM t0 WHERE t0a <
+(SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+-- !query schema
+struct<t0a:int,t0b:int>
+-- !query output
+1      1
+
+
+-- !query
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2a = t0a)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0a):bigint>
+-- !query output
+1      3
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT sum(c) FROM
+  (SELECT t1c as c
+  FROM   t1
+  WHERE  t1a > t0a
+  EXCEPT DISTINCT
+  SELECT t2c as c
+  FROM   t2
+  WHERE  t2b <= t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      NULL
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT sum(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT DISTINCT
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      3
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT sum(t1a + 3 * t1b + 5 * t1c) FROM
+  (SELECT t1c as t1a, t1a as t1b, t0a as t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT DISTINCT
+  SELECT t0a as t2b, t2c as t1a, t0b as t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0a, t0a, t0b, t0b):bigint>
+-- !query output
+1      11
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT count(t1c) FROM
+  (SELECT t1c
+  FROM   t1
+  WHERE  t1a = t0a
+  EXCEPT DISTINCT
+  SELECT t2c
+  FROM   t2
+  WHERE  t2b = t0b)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0b):bigint>
+-- !query output
+1      1
+2      0
+
+
+-- !query
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT t1a - t0a as d
+  FROM   t1
+  EXCEPT DISTINCT
+  SELECT t2a - t0a as d
+  FROM   t2)
+)
+FROM t0
+-- !query schema
+struct<t0a:int,scalarsubquery(t0a, t0a):bigint>
+-- !query output
+1      NULL
+2      NULL
+
+
+-- !query
+SELECT t0a, (SELECT sum(d) FROM
+  (SELECT sum(t0a) as d
+  FROM   t1
+  EXCEPT DISTINCT
+  SELECT sum(t2a) + t0a as d
+  FROM   t2)
+)
+FROM t0
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_REFERENCE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "sqlExprs" : "\"sum(t0a) AS d\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 36,
+    "stopIndex" : 67,
+    "fragment" : "SELECT sum(t0a) as d\n  FROM   t1"
+  } ]
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index e61f43d8847..4d76013d659 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
+import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, 
LogicalPlan, Project, Sort, Union}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
DisableAdaptiveExecution}
@@ -957,18 +958,17 @@ class SubquerySuite extends QueryTest
           | )
           | FROM t0""".stripMargin
 
-      {
-        val df = sql(query)
-        checkAnswer(df,
-          Row(1, 8) :: Row(2, null) :: Nil)
+      val df = sql(query)
+      checkAnswer(df,
+        Row(1, 8) :: Row(2, null) :: Nil)
+
+      val optimizedPlan = df.queryExecution.optimizedPlan
+      val aggregate = optimizedPlan.collectFirst { case a: Aggregate => a }.get
+      assert(aggregate.groupingExpressions.size == 2)
+      val union = optimizedPlan.collectFirst { case u: Union => u }.get
+      assert(union.output.size == 3)
+      assert(optimizedPlan.resolved)
 
-        val optimizedPlan = df.queryExecution.optimizedPlan
-        val aggregate = optimizedPlan.collectFirst { case a: Aggregate => a 
}.get
-        assert(aggregate.groupingExpressions.size == 2)
-        val union = optimizedPlan.collectFirst { case u: Union => u }.get
-        assert(union.output.size == 3)
-        assert(optimizedPlan.resolved)
-      }
       withSQLConf(SQLConf.DECORRELATE_INNER_QUERY_ENABLED.key -> "false") {
         val error = intercept[AnalysisException] { sql(query) }
         assert(error.getErrorClass == 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
@@ -1007,6 +1007,62 @@ class SubquerySuite extends QueryTest
     }
   }
 
+  test("SPARK-36124: Correlated subqueries with set ops") {
+    withTempView("t0", "t1", "t2") {
+      Seq((1, 1), (2, 0)).toDF("t0a", "t0b").createOrReplaceTempView("t0")
+      Seq((1, 1, 3)).toDF("t1a", "t1b", "t1c").createOrReplaceTempView("t1")
+      Seq((1, 1, 5), (2, 2, 7)).toDF("t2a", "t2b", 
"t2c").createOrReplaceTempView("t2")
+
+      // Union with different outer refs
+      for (setopType <- Seq("INTERSECT", "EXCEPT")) {
+        for (distinctness <- Seq("ALL", "DISTINCT")) {
+          val query =
+            s"""
+              | SELECT t0a, (SELECT sum(t1c) FROM
+              |   (SELECT t1c
+              |   FROM   t1
+              |   WHERE  t1a = t0a
+              |   ${setopType} ${distinctness}
+              |   SELECT t2c
+              |   FROM   t2
+              |   WHERE  t2b = t0b)
+              | )
+              | FROM t0""".stripMargin
+
+          val df = sql(query)
+          val optimizedPlan = df.queryExecution.optimizedPlan
+          val aggregate = optimizedPlan.collectFirst { case a: Aggregate => a 
}.get
+          assert(aggregate.groupingExpressions.size == 2)
+          if (distinctness == "DISTINCT") {
+            if (setopType == "INTERSECT") {
+              val join = optimizedPlan.collectFirst {
+                case j @ Join(_, _, LeftSemi, _, _) => j
+              }.get
+              assert(splitConjunctivePredicates(join.condition.get).size == 3)
+            } else {
+              val join = optimizedPlan.collectFirst {
+                case j @ Join(_, _, LeftAnti, _, _) => j
+              }.get
+              assert(splitConjunctivePredicates(join.condition.get).size == 3)
+            }
+          }
+          assert(optimizedPlan.resolved)
+
+          withSQLConf(SQLConf.DECORRELATE_INNER_QUERY_ENABLED.key -> "false") {
+            val error = intercept[AnalysisException] { sql(query) }
+            assert(error.getErrorClass == 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
+              "ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED")
+          }
+          withSQLConf(SQLConf.DECORRELATE_SET_OPS_ENABLED.key -> "false") {
+            val error = intercept[AnalysisException] { sql(query) }
+            assert(error.getErrorClass == 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
+              "ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED")
+          }
+        }
+      }
+    }
+  }
+
   // Generate operator
   test("Correlated subqueries in LATERAL VIEW") {
     withTempView("t1", "t2") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to