This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 5511a0bd1d3 [SPARK-44431][SQL] Fix behavior of null IN (empty list) in 
optimization rules
5511a0bd1d3 is described below

commit 5511a0bd1d354a99e77de4ef132dfdd1ed6bd762
Author: Jack Chen <[email protected]>
AuthorDate: Thu Jul 20 11:09:50 2023 +0800

    [SPARK-44431][SQL] Fix behavior of null IN (empty list) in optimization 
rules
    
    ### What changes were proposed in this pull request?
    `null IN (empty list)` incorrectly evaluates to null, when it should 
evaluate to false. (The reason it should be false is because a IN (b1, b2) is 
defined as a = b1 OR a = b2, and an empty IN list is treated as an empty OR 
which is false. This is specified by ANSI SQL.)
    
    Many places in Spark execution (In, InSet, InSubquery) and optimization 
(OptimizeIn, NullPropagation) implemented this wrong behavior. This is a 
longstanding correctness issue which has existed since null support for IN 
expressions was first added to Spark.
    
    This PR fixes the optimization rules OptimizeIn and NullPropagation, which 
followed the preexisting, incorrect execution behavior. The execution fixes 
will be in the next PR.
    
    The behavior is under a flag, which will be available to revert to the 
legacy behavior if needed. This flag is set to disable the new behavior until 
all of the fix PRs are complete.
    
    See [this 
doc](https://docs.google.com/document/d/1k8AY8oyT-GI04SnP7eXttPDnDj-Ek-c3luF2zL6DPNU/edit)
 for more information.
    
    ### Why are the changes needed?
    Fix wrong SQL semantics
    
    ### Does this PR introduce _any_ user-facing change?
    Not yet, but will fix wrong SQL semantics when enabled
    
    ### How was this patch tested?
    Add unit tests and sql tests.
    
    Closes #42007 from jchen5/null-in-empty-opt.
    
    Authored-by: Jack Chen <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit db357edb7b21b12c9721a86985a3cc92fcd32bf3)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |   3 +
 .../spark/sql/catalyst/optimizer/expressions.scala |  32 ++-
 .../org/apache/spark/sql/internal/SQLConf.scala    |  11 +
 .../sql/catalyst/optimizer/OptimizeInSuite.scala   | 174 +++++++++---
 .../catalyst/optimizer/ReplaceOperatorSuite.scala  |  26 +-
 .../subquery/in-subquery/in-null-semantics.sql.out | 304 +++++++++++++++++++++
 .../subquery/in-subquery/in-null-semantics.sql     |  57 ++++
 .../subquery/in-subquery/in-null-semantics.sql.out | 239 ++++++++++++++++
 .../scala/org/apache/spark/sql/EmptyInSuite.scala  |  63 +++++
 9 files changed, 857 insertions(+), 52 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index fd2ea96a296..95cf3aee16f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -96,6 +96,9 @@ abstract class Optimizer(catalogManager: CatalogManager)
         OptimizeRepartition,
         TransposeWindow,
         NullPropagation,
+        // NullPropagation may introduce Exists subqueries, so 
RewriteNonCorrelatedExists must run
+        // after.
+        RewriteNonCorrelatedExists,
         NullDownPropagation,
         ConstantPropagation,
         FoldablePropagation,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
index 1d756a2dcb7..8cb560199c0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.trees.{AlwaysProcess, TreeNodeTag}
 import org.apache.spark.sql.catalyst.trees.TreePattern._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -283,9 +284,13 @@ object OptimizeIn extends Rule[LogicalPlan] {
     _.containsPattern(IN), ruleId) {
     case q: LogicalPlan => 
q.transformExpressionsDownWithPruning(_.containsPattern(IN), ruleId) {
       case In(v, list) if list.isEmpty =>
-        // When v is not nullable, the following expression will be optimized
-        // to FalseLiteral which is tested in OptimizeInSuite.scala
-        If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType))
+        if (!SQLConf.get.getConf(SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR)) {
+          FalseLiteral
+        } else {
+          // Incorrect legacy behavior optimizes to null if the left side is 
null, and otherwise
+          // to false.
+          If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType))
+        }
       case expr @ In(v, list) if expr.inSetConvertible =>
         val newList = ExpressionSet(list).toSeq
         if (newList.length == 1
@@ -841,9 +846,24 @@ object NullPropagation extends Rule[LogicalPlan] {
           }
         }
 
-      // If the value expression is NULL then transform the In expression to 
null literal.
-      case In(Literal(null, _), _) => Literal.create(null, BooleanType)
-      case InSubquery(Seq(Literal(null, _)), _) => Literal.create(null, 
BooleanType)
+      // If the list is empty, transform the In expression to false literal.
+      case In(_, list)
+        if list.isEmpty && 
!SQLConf.get.getConf(SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR) =>
+        Literal.create(false, BooleanType)
+      // If the value expression is NULL (and the list is non-empty), then 
transform the
+      // In expression to null literal.
+      // If the legacy flag is set, then it becomes null even if the list is 
empty (which is
+      // incorrect legacy behavior)
+      case In(Literal(null, _), list)
+        if list.nonEmpty || 
SQLConf.get.getConf(SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR)
+      => Literal.create(null, BooleanType)
+      case InSubquery(Seq(Literal(null, _)), _)
+        if SQLConf.get.getConf(SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR) =>
+        Literal.create(null, BooleanType)
+      case InSubquery(Seq(Literal(null, _)), ListQuery(sub, _, _, _, 
conditions, _))
+        if !SQLConf.get.getConf(SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR)
+        && conditions.isEmpty =>
+        If(Exists(sub), Literal(null, BooleanType), FalseLiteral)
 
       // Non-leaf NullIntolerant expressions will return null, if at least one 
of its children is
       // a null literal.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 4d1d4b45fa3..00bb6f77ef3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4289,6 +4289,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR =
+    buildConf("spark.sql.legacy.nullInEmptyListBehavior")
+      .internal()
+      .doc("When set to true, restores the legacy incorrect behavior of IN 
expressions for " +
+        "NULL values IN an empty list (including IN subqueries and literal IN 
lists): " +
+        "`null IN (empty list)` should evaluate to false, but sometimes (not 
always) " +
+        "incorrectly evaluates to null in the legacy behavior.")
+      .version("3.5.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val ERROR_MESSAGE_FORMAT = buildConf("spark.sql.error.messageFormat")
     .doc("When PRETTY, the error message consists of textual representation of 
error class, " +
       "message and query context. The MINIMAL and STANDARD formats are pretty 
JSON formats where " +
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
index 7f377d18e9d..7418128dd48 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
@@ -22,9 +22,9 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.PlanTest
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, 
LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import 
org.apache.spark.sql.internal.SQLConf.OPTIMIZER_INSET_CONVERSION_THRESHOLD
+import 
org.apache.spark.sql.internal.SQLConf.{LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR, 
OPTIMIZER_INSET_CONVERSION_THRESHOLD}
 import org.apache.spark.sql.types._
 
 class OptimizeInSuite extends PlanTest {
@@ -121,19 +121,43 @@ class OptimizeInSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
-  test("OptimizedIn test: NULL IN (subquery) gets transformed to 
Filter(null)") {
-    val subquery = ListQuery(testRelation.select(UnresolvedAttribute("a")))
-    val originalQuery =
-      testRelation
-        .where(InSubquery(Seq(Literal.create(null, NullType)), subquery))
-        .analyze
+  test("OptimizedIn test: Legacy behavior: " +
+    "NULL IN (subquery) gets transformed to Filter(null)") {
+    withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "true") {
+      val subquery = ListQuery(testRelation.select(UnresolvedAttribute("a")))
+      val originalQuery =
+        testRelation
+          .where(InSubquery(Seq(Literal.create(null, NullType)), subquery))
+          .analyze
 
-    val optimized = Optimize.execute(originalQuery.analyze)
-    val correctAnswer =
-      testRelation
-        .where(Literal.create(null, BooleanType))
-        .analyze
-    comparePlans(optimized, correctAnswer)
+      val optimized = Optimize.execute(originalQuery.analyze)
+      val correctAnswer =
+        testRelation
+          .where(Literal.create(null, BooleanType))
+          .analyze
+      comparePlans(optimized, correctAnswer)
+    }
+  }
+
+  test("OptimizedIn test: NULL IN (subquery) gets transformed to " +
+    "If(Exists(subquery), null, false)") {
+    withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "false") {
+      val subquery = testRelation.select(UnresolvedAttribute("a"))
+      val originalQuery =
+        testRelation
+          .where(InSubquery(Seq(Literal.create(null, NullType)), 
ListQuery(subquery)))
+          .analyze
+
+      val optimized = Optimize.execute(originalQuery.analyze)
+      // Our simplified Optimize results in an extra redundant Project. This 
gets collapsed in
+      // the full optimizer.
+      val correctAnswer =
+        testRelation
+          .where(If(Exists(Project(Seq(UnresolvedAttribute("a")), subquery)),
+            Literal.create(null, BooleanType), Literal(false)))
+          .analyze
+      comparePlans(optimized, correctAnswer)
+    }
   }
 
   test("OptimizedIn test: Inset optimization disabled as " +
@@ -219,36 +243,108 @@ class OptimizeInSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
-  test("OptimizedIn test: In empty list gets transformed to FalseLiteral " +
-    "when value is not nullable") {
-    val originalQuery =
-      testRelation
-        .where(In(Literal("a"), Nil))
-        .analyze
+  test("OptimizedIn test: expr IN (empty list) gets transformed to literal 
false") {
+    withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "false") {
+      val originalQuery =
+        testRelation
+          .where(In(UnresolvedAttribute("a"), Nil))
+          .analyze
 
-    val optimized = Optimize.execute(originalQuery)
-    val correctAnswer =
-      testRelation
-        .where(Literal(false))
-        .analyze
+      val optimized = Optimize.execute(originalQuery.analyze)
+      val correctAnswer =
+        testRelation
+          .where(Literal.create(false, BooleanType))
+          .analyze
 
-    comparePlans(optimized, correctAnswer)
+      comparePlans(optimized, correctAnswer)
+    }
   }
 
-  test("OptimizedIn test: In empty list gets transformed to `If` expression " +
-    "when value is nullable") {
-    val originalQuery =
-      testRelation
-        .where(In(UnresolvedAttribute("a"), Nil))
-        .analyze
+  test("OptimizedIn test: null IN (empty list) gets transformed to literal 
false") {
+    withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "false") {
+      val originalQuery =
+        testRelation
+          .where(In(Literal.create(null, NullType), Nil))
+          .analyze
 
-    val optimized = Optimize.execute(originalQuery)
-    val correctAnswer =
-      testRelation
-        .where(If(IsNotNull(UnresolvedAttribute("a")),
-          Literal(false), Literal.create(null, BooleanType)))
-        .analyze
+      val optimized = Optimize.execute(originalQuery.analyze)
+      val correctAnswer =
+        testRelation
+          .where(Literal.create(false, BooleanType))
+          .analyze
 
-    comparePlans(optimized, correctAnswer)
+      comparePlans(optimized, correctAnswer)
+    }
+  }
+
+  test("OptimizedIn test: expr IN (empty list) gets transformed to literal 
false in select") {
+    withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "false") {
+      val originalQuery =
+        testRelation
+          .select(In(UnresolvedAttribute("a"), Nil).as("x"))
+          .analyze
+
+      val optimized = Optimize.execute(originalQuery.analyze)
+      val correctAnswer =
+        testRelation
+          .select(Literal.create(false, BooleanType).as("x"))
+          .analyze
+
+      comparePlans(optimized, correctAnswer)
+    }
+  }
+
+  test("OptimizedIn test: null IN (empty list) gets transformed to literal 
false in select") {
+    withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "false") {
+      val originalQuery =
+        testRelation
+          .select(In(Literal.create(null, NullType), Nil).as("x"))
+          .analyze
+
+      val optimized = Optimize.execute(originalQuery.analyze)
+      val correctAnswer =
+        testRelation
+          .select(Literal.create(false, BooleanType).as("x"))
+          .analyze
+
+      comparePlans(optimized, correctAnswer)
+    }
+  }
+
+  test("OptimizedIn test: Legacy behavior: " +
+    "In empty list gets transformed to FalseLiteral when value is not 
nullable") {
+    withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "true") {
+      val originalQuery =
+        testRelation
+          .where(In(Literal("a"), Nil))
+          .analyze
+
+      val optimized = Optimize.execute(originalQuery)
+      val correctAnswer =
+        testRelation
+          .where(Literal(false))
+          .analyze
+
+      comparePlans(optimized, correctAnswer)
+    }
+  }
+
+  test("OptimizedIn test: Legacy behavior:  " +
+    "In empty list gets transformed to `If` expression when value is 
nullable") {
+    withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "true") {
+      val originalQuery =
+        testRelation
+          .where(In(UnresolvedAttribute("a"), Nil))
+          .analyze
+
+      val optimized = Optimize.execute(originalQuery)
+      val correctAnswer =
+        testRelation
+          .where(If(IsNotNull(UnresolvedAttribute("a")),
+            Literal(false), Literal.create(null, BooleanType)))
+          .analyze
+
+      comparePlans(optimized, correctAnswer)
+    }
   }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
index 06fcb12acdd..5d81e96a8e5 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
@@ -25,6 +25,7 @@ import 
org.apache.spark.sql.catalyst.expressions.aggregate.First
 import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.internal.SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR
 import org.apache.spark.sql.types.BooleanType
 
 class ReplaceOperatorSuite extends PlanTest {
@@ -233,13 +234,24 @@ class ReplaceOperatorSuite extends PlanTest {
     val basePlan = LocalRelation(Seq($"a".int, $"b".int))
     val otherPlan = basePlan.where($"a".in(1, 2) || $"b".in())
     val except = Except(basePlan, otherPlan, false)
-    val result = OptimizeIn(Optimize.execute(except.analyze))
-    val correctAnswer = Aggregate(basePlan.output, basePlan.output,
-      Filter(!Coalesce(Seq(
-        $"a".in(1, 2) || If($"b".isNotNull, Literal.FalseLiteral, 
Literal(null, BooleanType)),
-        Literal.FalseLiteral)),
-        basePlan)).analyze
-    comparePlans(result, correctAnswer)
+    withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "false") {
+      val result = OptimizeIn(Optimize.execute(except.analyze))
+      val correctAnswer = Aggregate(basePlan.output, basePlan.output,
+        Filter(!Coalesce(Seq(
+          $"a".in(1, 2) || Literal.FalseLiteral,
+          Literal.FalseLiteral)),
+          basePlan)).analyze
+      comparePlans(result, correctAnswer)
+    }
+    withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "true") {
+      val result = OptimizeIn(Optimize.execute(except.analyze))
+      val correctAnswer = Aggregate(basePlan.output, basePlan.output,
+        Filter(!Coalesce(Seq(
+          $"a".in(1, 2) || If($"b".isNotNull, Literal.FalseLiteral, 
Literal(null, BooleanType)),
+          Literal.FalseLiteral)),
+          basePlan)).analyze
+      comparePlans(result, correctAnswer)
+    }
   }
 
   test("SPARK-26366: ReplaceExceptWithFilter should not transform 
non-deterministic") {
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-null-semantics.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-null-semantics.sql.out
new file mode 100644
index 00000000000..ac5c41dd307
--- /dev/null
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-null-semantics.sql.out
@@ -0,0 +1,304 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+create temp view v (c) as values (1), (null)
+-- !query analysis
+CreateViewCommand `v`, [(c,None)], values (1), (null), false, false, 
LocalTempView, true
+   +- LocalRelation [col1#x]
+
+
+-- !query
+create temp view v_empty (e) as select 1 where false
+-- !query analysis
+CreateViewCommand `v_empty`, [(e,None)], select 1 where false, false, false, 
LocalTempView, true
+   +- Project [1 AS 1#x]
+      +- Filter false
+         +- OneRowRelation
+
+
+-- !query
+create table t(c int) using json
+-- !query analysis
+CreateDataSourceTableCommand `spark_catalog`.`default`.`t`, false
+
+
+-- !query
+insert into t values (1), (null)
+-- !query analysis
+InsertIntoHadoopFsRelationCommand file:[not included in 
comparison]/{warehouse_dir}/t, false, JSON, [path=file:[not included in 
comparison]/{warehouse_dir}/t], Append, `spark_catalog`.`default`.`t`, 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included 
in comparison]/{warehouse_dir}/t), [c]
++- Project [cast(col1#x as int) AS c#x]
+   +- LocalRelation [col1#x]
+
+
+-- !query
+create table t2(d int) using json
+-- !query analysis
+CreateDataSourceTableCommand `spark_catalog`.`default`.`t2`, false
+
+
+-- !query
+insert into t2 values (2)
+-- !query analysis
+InsertIntoHadoopFsRelationCommand file:[not included in 
comparison]/{warehouse_dir}/t2, false, JSON, [path=file:[not included in 
comparison]/{warehouse_dir}/t2], Append, `spark_catalog`.`default`.`t2`, 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included 
in comparison]/{warehouse_dir}/t2), [d]
++- Project [cast(col1#x as int) AS d#x]
+   +- LocalRelation [col1#x]
+
+
+-- !query
+create table t_empty(e int) using json
+-- !query analysis
+CreateDataSourceTableCommand `spark_catalog`.`default`.`t_empty`, false
+
+
+-- !query
+set spark.sql.legacy.nullInEmptyListBehavior = false
+-- !query analysis
+SetCommand (spark.sql.legacy.nullInEmptyListBehavior,Some(false))
+
+
+-- !query
+select c, c in (select e from t_empty) from t
+-- !query analysis
+Project [c#x, c#x IN (list#x []) AS (c IN (listquery()))#x]
+:  +- Project [e#x]
+:     +- SubqueryAlias spark_catalog.default.t_empty
+:        +- Relation spark_catalog.default.t_empty[e#x] json
++- SubqueryAlias spark_catalog.default.t
+   +- Relation spark_catalog.default.t[c#x] json
+
+
+-- !query
+select c, c in (select e from v_empty) from v
+-- !query analysis
+Project [c#x, c#x IN (list#x []) AS (c IN (listquery()))#x]
+:  +- Project [e#x]
+:     +- SubqueryAlias v_empty
+:        +- View (`v_empty`, [e#x])
+:           +- Project [cast(1#x as int) AS e#x]
+:              +- Project [1 AS 1#x]
+:                 +- Filter false
+:                    +- OneRowRelation
++- SubqueryAlias v
+   +- View (`v`, [c#x])
+      +- Project [cast(col1#x as int) AS c#x]
+         +- LocalRelation [col1#x]
+
+
+-- !query
+select c, c not in (select e from t_empty) from t
+-- !query analysis
+Project [c#x, NOT c#x IN (list#x []) AS (NOT (c IN (listquery())))#x]
+:  +- Project [e#x]
+:     +- SubqueryAlias spark_catalog.default.t_empty
+:        +- Relation spark_catalog.default.t_empty[e#x] json
++- SubqueryAlias spark_catalog.default.t
+   +- Relation spark_catalog.default.t[c#x] json
+
+
+-- !query
+select c, c not in (select e from v_empty) from v
+-- !query analysis
+Project [c#x, NOT c#x IN (list#x []) AS (NOT (c IN (listquery())))#x]
+:  +- Project [e#x]
+:     +- SubqueryAlias v_empty
+:        +- View (`v_empty`, [e#x])
+:           +- Project [cast(1#x as int) AS e#x]
+:              +- Project [1 AS 1#x]
+:                 +- Filter false
+:                    +- OneRowRelation
++- SubqueryAlias v
+   +- View (`v`, [c#x])
+      +- Project [cast(col1#x as int) AS c#x]
+         +- LocalRelation [col1#x]
+
+
+-- !query
+select null in (select e from t_empty)
+-- !query analysis
+Project [cast(null as int) IN (list#x []) AS (NULL IN (listquery()))#x]
+:  +- Project [e#x]
+:     +- Project [e#x]
+:        +- SubqueryAlias spark_catalog.default.t_empty
+:           +- Relation spark_catalog.default.t_empty[e#x] json
++- OneRowRelation
+
+
+-- !query
+select null in (select e from v_empty)
+-- !query analysis
+Project [cast(null as int) IN (list#x []) AS (NULL IN (listquery()))#x]
+:  +- Project [e#x]
+:     +- Project [e#x]
+:        +- SubqueryAlias v_empty
+:           +- View (`v_empty`, [e#x])
+:              +- Project [cast(1#x as int) AS e#x]
+:                 +- Project [1 AS 1#x]
+:                    +- Filter false
+:                       +- OneRowRelation
++- OneRowRelation
+
+
+-- !query
+select null not in (select e from t_empty)
+-- !query analysis
+Project [NOT cast(null as int) IN (list#x []) AS (NOT (NULL IN 
(listquery())))#x]
+:  +- Project [e#x]
+:     +- Project [e#x]
+:        +- SubqueryAlias spark_catalog.default.t_empty
+:           +- Relation spark_catalog.default.t_empty[e#x] json
++- OneRowRelation
+
+
+-- !query
+select null not in (select e from v_empty)
+-- !query analysis
+Project [NOT cast(null as int) IN (list#x []) AS (NOT (NULL IN 
(listquery())))#x]
+:  +- Project [e#x]
+:     +- Project [e#x]
+:        +- SubqueryAlias v_empty
+:           +- View (`v_empty`, [e#x])
+:              +- Project [cast(1#x as int) AS e#x]
+:                 +- Project [1 AS 1#x]
+:                    +- Filter false
+:                       +- OneRowRelation
++- OneRowRelation
+
+
+-- !query
+select * from t left join t2 on (t.c in (select e from t_empty)) is null
+-- !query analysis
+Project [c#x, d#x]
++- Join LeftOuter, isnull(c#x IN (list#x []))
+   :  +- Project [e#x]
+   :     +- SubqueryAlias spark_catalog.default.t_empty
+   :        +- Relation spark_catalog.default.t_empty[e#x] json
+   :- SubqueryAlias spark_catalog.default.t
+   :  +- Relation spark_catalog.default.t[c#x] json
+   +- SubqueryAlias spark_catalog.default.t2
+      +- Relation spark_catalog.default.t2[d#x] json
+
+
+-- !query
+select * from t left join t2 on (t.c not in (select e from t_empty)) is null
+-- !query analysis
+Project [c#x, d#x]
++- Join LeftOuter, isnull(NOT c#x IN (list#x []))
+   :  +- Project [e#x]
+   :     +- SubqueryAlias spark_catalog.default.t_empty
+   :        +- Relation spark_catalog.default.t_empty[e#x] json
+   :- SubqueryAlias spark_catalog.default.t
+   :  +- Relation spark_catalog.default.t[c#x] json
+   +- SubqueryAlias spark_catalog.default.t2
+      +- Relation spark_catalog.default.t2[d#x] json
+
+
+-- !query
+set spark.sql.legacy.nullInEmptyListBehavior = true
+-- !query analysis
+SetCommand (spark.sql.legacy.nullInEmptyListBehavior,Some(true))
+
+
+-- !query
+select null in (select e from t_empty)
+-- !query analysis
+Project [cast(null as int) IN (list#x []) AS (NULL IN (listquery()))#x]
+:  +- Project [e#x]
+:     +- Project [e#x]
+:        +- SubqueryAlias spark_catalog.default.t_empty
+:           +- Relation spark_catalog.default.t_empty[e#x] json
++- OneRowRelation
+
+
+-- !query
+select null in (select e from v_empty)
+-- !query analysis
+Project [cast(null as int) IN (list#x []) AS (NULL IN (listquery()))#x]
+:  +- Project [e#x]
+:     +- Project [e#x]
+:        +- SubqueryAlias v_empty
+:           +- View (`v_empty`, [e#x])
+:              +- Project [cast(1#x as int) AS e#x]
+:                 +- Project [1 AS 1#x]
+:                    +- Filter false
+:                       +- OneRowRelation
++- OneRowRelation
+
+
+-- !query
+select null not in (select e from t_empty)
+-- !query analysis
+Project [NOT cast(null as int) IN (list#x []) AS (NOT (NULL IN 
(listquery())))#x]
+:  +- Project [e#x]
+:     +- Project [e#x]
+:        +- SubqueryAlias spark_catalog.default.t_empty
+:           +- Relation spark_catalog.default.t_empty[e#x] json
++- OneRowRelation
+
+
+-- !query
+select null not in (select e from v_empty)
+-- !query analysis
+Project [NOT cast(null as int) IN (list#x []) AS (NOT (NULL IN 
(listquery())))#x]
+:  +- Project [e#x]
+:     +- Project [e#x]
+:        +- SubqueryAlias v_empty
+:           +- View (`v_empty`, [e#x])
+:              +- Project [cast(1#x as int) AS e#x]
+:                 +- Project [1 AS 1#x]
+:                    +- Filter false
+:                       +- OneRowRelation
++- OneRowRelation
+
+
+-- !query
+select * from t left join t2 on (t.c in (select e from t_empty)) is null
+-- !query analysis
+Project [c#x, d#x]
++- Join LeftOuter, isnull(c#x IN (list#x []))
+   :  +- Project [e#x]
+   :     +- SubqueryAlias spark_catalog.default.t_empty
+   :        +- Relation spark_catalog.default.t_empty[e#x] json
+   :- SubqueryAlias spark_catalog.default.t
+   :  +- Relation spark_catalog.default.t[c#x] json
+   +- SubqueryAlias spark_catalog.default.t2
+      +- Relation spark_catalog.default.t2[d#x] json
+
+
+-- !query
+select * from t left join t2 on (t.c not in (select e from t_empty)) is null
+-- !query analysis
+Project [c#x, d#x]
++- Join LeftOuter, isnull(NOT c#x IN (list#x []))
+   :  +- Project [e#x]
+   :     +- SubqueryAlias spark_catalog.default.t_empty
+   :        +- Relation spark_catalog.default.t_empty[e#x] json
+   :- SubqueryAlias spark_catalog.default.t
+   :  +- Relation spark_catalog.default.t[c#x] json
+   +- SubqueryAlias spark_catalog.default.t2
+      +- Relation spark_catalog.default.t2[d#x] json
+
+
+-- !query
+reset spark.sql.legacy.nullInEmptyListBehavior
+-- !query analysis
+ResetCommand spark.sql.legacy.nullInEmptyListBehavior
+
+
+-- !query
+drop table t
+-- !query analysis
+DropTable false, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t
+
+
+-- !query
+drop table t2
+-- !query analysis
+DropTable false, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t2
+
+
+-- !query
+drop table t_empty
+-- !query analysis
+DropTable false, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t_empty
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-null-semantics.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-null-semantics.sql
new file mode 100644
index 00000000000..b893d8970b4
--- /dev/null
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-null-semantics.sql
@@ -0,0 +1,57 @@
+create temp view v (c) as values (1), (null);
+create temp view v_empty (e) as select 1 where false;
+
+-- Note: tables and temp views hit different optimization/execution codepaths
+create table t(c int) using json;
+insert into t values (1), (null);
+create table t2(d int) using json;
+insert into t2 values (2);
+create table t_empty(e int) using json;
+
+
+
+set spark.sql.legacy.nullInEmptyListBehavior = false;
+
+-- null IN (empty subquery)
+-- Correct results: c in (emptylist) should always be false
+
+select c, c in (select e from t_empty) from t;
+select c, c in (select e from v_empty) from v;
+select c, c not in (select e from t_empty) from t;
+select c, c not in (select e from v_empty) from v;
+
+-- constant null IN (empty subquery) - rewritten by NullPropagation rule
+
+select null in (select e from t_empty);
+select null in (select e from v_empty);
+select null not in (select e from t_empty);
+select null not in (select e from v_empty);
+
+-- IN subquery which is not rewritten to join - here we use IN in the ON 
condition because that is a case that doesn't get rewritten to join in 
RewritePredicateSubquery, so we can observe the execution behavior of 
InSubquery directly
+-- Correct results: column t2.d should be NULL because the ON condition is 
always false
+-- This will be fixed by the execution fixes.
+select * from t left join t2 on (t.c in (select e from t_empty)) is null;
+select * from t left join t2 on (t.c not in (select e from t_empty)) is null;
+
+
+
+-- Test legacy behavior flag
+set spark.sql.legacy.nullInEmptyListBehavior = true;
+
+-- constant null IN (empty subquery) - rewritten by NullPropagation rule
+
+select null in (select e from t_empty);
+select null in (select e from v_empty);
+select null not in (select e from t_empty);
+select null not in (select e from v_empty);
+
+-- IN subquery which is not rewritten to join - here we use IN in the ON 
condition because that is a case that doesn't get rewritten to join in 
RewritePredicateSubquery, so we can observe the execution behavior of 
InSubquery directly
+-- Correct results: column t2.d should be NULL because the ON condition is 
always false
+select * from t left join t2 on (t.c in (select e from t_empty)) is null;
+select * from t left join t2 on (t.c not in (select e from t_empty)) is null;
+
+reset spark.sql.legacy.nullInEmptyListBehavior;
+
+drop table t;
+drop table t2;
+drop table t_empty;
diff --git 
a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-null-semantics.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-null-semantics.sql.out
new file mode 100644
index 00000000000..39b03576baa
--- /dev/null
+++ 
b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-null-semantics.sql.out
@@ -0,0 +1,239 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+create temp view v (c) as values (1), (null)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+create temp view v_empty (e) as select 1 where false
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+create table t(c int) using json
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+insert into t values (1), (null)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+create table t2(d int) using json
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+insert into t2 values (2)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+create table t_empty(e int) using json
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+set spark.sql.legacy.nullInEmptyListBehavior = false
+-- !query schema
+struct<key:string,value:string>
+-- !query output
+spark.sql.legacy.nullInEmptyListBehavior       false
+
+
+-- !query
+select c, c in (select e from t_empty) from t
+-- !query schema
+struct<c:int,(c IN (listquery())):boolean>
+-- !query output
+1      false
+NULL   false
+
+
+-- !query
+select c, c in (select e from v_empty) from v
+-- !query schema
+struct<c:int,(c IN (listquery())):boolean>
+-- !query output
+1      false
+NULL   false
+
+
+-- !query
+select c, c not in (select e from t_empty) from t
+-- !query schema
+struct<c:int,(NOT (c IN (listquery()))):boolean>
+-- !query output
+1      true
+NULL   true
+
+
+-- !query
+select c, c not in (select e from v_empty) from v
+-- !query schema
+struct<c:int,(NOT (c IN (listquery()))):boolean>
+-- !query output
+1      true
+NULL   true
+
+
+-- !query
+select null in (select e from t_empty)
+-- !query schema
+struct<(NULL IN (listquery())):boolean>
+-- !query output
+false
+
+
+-- !query
+select null in (select e from v_empty)
+-- !query schema
+struct<(NULL IN (listquery())):boolean>
+-- !query output
+false
+
+
+-- !query
+select null not in (select e from t_empty)
+-- !query schema
+struct<(NOT (NULL IN (listquery()))):boolean>
+-- !query output
+true
+
+
+-- !query
+select null not in (select e from v_empty)
+-- !query schema
+struct<(NOT (NULL IN (listquery()))):boolean>
+-- !query output
+true
+
+
+-- !query
+select * from t left join t2 on (t.c in (select e from t_empty)) is null
+-- !query schema
+struct<c:int,d:int>
+-- !query output
+1      NULL
+NULL   2
+
+
+-- !query
+select * from t left join t2 on (t.c not in (select e from t_empty)) is null
+-- !query schema
+struct<c:int,d:int>
+-- !query output
+1      NULL
+NULL   2
+
+
+-- !query
+set spark.sql.legacy.nullInEmptyListBehavior = true
+-- !query schema
+struct<key:string,value:string>
+-- !query output
+spark.sql.legacy.nullInEmptyListBehavior       true
+
+
+-- !query
+select null in (select e from t_empty)
+-- !query schema
+struct<(NULL IN (listquery())):boolean>
+-- !query output
+NULL
+
+
+-- !query
+select null in (select e from v_empty)
+-- !query schema
+struct<(NULL IN (listquery())):boolean>
+-- !query output
+NULL
+
+
+-- !query
+select null not in (select e from t_empty)
+-- !query schema
+struct<(NOT (NULL IN (listquery()))):boolean>
+-- !query output
+NULL
+
+
+-- !query
+select null not in (select e from v_empty)
+-- !query schema
+struct<(NOT (NULL IN (listquery()))):boolean>
+-- !query output
+NULL
+
+
+-- !query
+select * from t left join t2 on (t.c in (select e from t_empty)) is null
+-- !query schema
+struct<c:int,d:int>
+-- !query output
+1      NULL
+NULL   2
+
+
+-- !query
+select * from t left join t2 on (t.c not in (select e from t_empty)) is null
+-- !query schema
+struct<c:int,d:int>
+-- !query output
+1      NULL
+NULL   2
+
+
+-- !query
+reset spark.sql.legacy.nullInEmptyListBehavior
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+drop table t
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+drop table t2
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+drop table t_empty
+-- !query schema
+struct<>
+-- !query output
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/EmptyInSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/EmptyInSuite.scala
new file mode 100644
index 00000000000..c9e016c891e
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/EmptyInSuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+class EmptyInSuite extends QueryTest
+with SharedSparkSession {
+  import testImplicits._
+
+  val row = identity[(java.lang.Integer, java.lang.Double)](_)
+
+  lazy val t = Seq(
+    row((1, 1.0)),
+    row((null, 2.0))).toDF("a", "b")
+
+  test("IN with empty list") {
+    // This test has to be written in scala to construct a literal empty IN 
list, since that
+    // isn't valid syntax in SQL.
+    val emptylist = Seq.empty[Literal]
+
+    Seq(true, false).foreach { legacyNullInBehavior =>
+      // To observe execution behavior, disable the OptimizeIn rule which 
optimizes away empty lists
+      Seq(true, false).foreach { disableOptimizeIn =>
+        // Disable ConvertToLocalRelation since it would collapse the 
evaluation of the IN
+        // expression over the LocalRelation
+        var excludedRules = 
"org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation"
+        if (disableOptimizeIn) {
+          excludedRules += 
",org.apache.spark.sql.catalyst.optimizer.OptimizeIn"
+        }
+        withSQLConf(
+          SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> excludedRules,
+          SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> 
legacyNullInBehavior.toString) {
+          // We still get legacy behavior with disableOptimizeIn until 
execution is also fixed
+          val expectedResultForNullInEmpty =
+            if (legacyNullInBehavior || disableOptimizeIn) null else false
+          val df = t.select(col("a"), col("a").isin(emptylist: _*))
+          checkAnswer(
+            df,
+            Row(1, false) :: Row(null, expectedResultForNullInEmpty) :: Nil)
+        }
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to