This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0704e95  [SPARK-38132][SQL] Remove `NotPropagation` rule
0704e95 is described below

commit 0704e957266aac0fa4247eb2e53d019348188164
Author: Kazuyuki Tanimura <[email protected]>
AuthorDate: Mon Feb 7 16:18:08 2022 -0800

    [SPARK-38132][SQL] Remove `NotPropagation` rule
    
    ### What changes were proposed in this pull request?
    This is a follow-up PR to mitigate the bug introduced by SPARK-36665. This 
PR removes `NotPropagation` optimization for now until we find a better 
approach.
    
    ### Why are the changes needed?
    `NotPropagation` optimization previously broke `RewritePredicateSubquery` 
so that it does not properly rewrite the predicate to a NULL-aware left anti 
join anymore.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing tests
    
    Closes #35428 from kazuyukitanimura/SPARK-36665-fix.
    
    Authored-by: Kazuyuki Tanimura <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |   1 -
 .../spark/sql/catalyst/optimizer/expressions.scala |  47 ------
 .../sql/catalyst/rules/RuleIdCollection.scala      |   1 -
 .../catalyst/optimizer/NotPropagationSuite.scala   | 176 ---------------------
 .../optimizer/NullDownPropagationSuite.scala       |   1 -
 .../scala/org/apache/spark/sql/SubquerySuite.scala |  26 +++
 6 files changed, 26 insertions(+), 226 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 8fba271..61d6e39 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -108,7 +108,6 @@ abstract class Optimizer(catalogManager: CatalogManager)
         EliminateAggregateFilter,
         ReorderAssociativeOperator,
         LikeSimplification,
-        NotPropagation,
         BooleanSimplification,
         SimplifyConditionals,
         PushFoldableIntoBranches,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
index eda4217..c1e5783 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
@@ -448,53 +448,6 @@ object BooleanSimplification extends Rule[LogicalPlan] 
with PredicateHelper {
 
 
 /**
- * Move/Push `Not` operator if it's beneficial.
- */
-object NotPropagation extends Rule[LogicalPlan] {
-  // Given argument x, return true if expression Not(x) can be simplified
-  // E.g. let x == Not(y), then canSimplifyNot(x) == true because Not(x) == 
Not(Not(y)) == y
-  // For the case of x = EqualTo(a, b), recursively check each child expression
-  // Extra nullable check is required for EqualNullSafe because
-  // Not(EqualNullSafe(e, null)) is different from EqualNullSafe(e, Not(null))
-  private def canSimplifyNot(x: Expression): Boolean = x match {
-    case Literal(_, BooleanType) | Literal(_, NullType) => true
-    case _: Not | _: IsNull | _: IsNotNull | _: And | _: Or => true
-    case _: GreaterThan | _: GreaterThanOrEqual | _: LessThan | _: 
LessThanOrEqual => true
-    case EqualTo(a, b) if canSimplifyNot(a) || canSimplifyNot(b) => true
-    case EqualNullSafe(a, b)
-      if !a.nullable && !b.nullable && (canSimplifyNot(a) || 
canSimplifyNot(b)) => true
-    case _ => false
-  }
-
-  def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
-    _.containsPattern(NOT), ruleId) {
-    case q: LogicalPlan => 
q.transformExpressionsDownWithPruning(_.containsPattern(NOT), ruleId) {
-      // Move `Not` from one side of `EqualTo`/`EqualNullSafe` to the other 
side if it's beneficial.
-      // E.g. `EqualTo(Not(a), b)` where `b = Not(c)`, it will become
-      // `EqualTo(a, Not(b))` => `EqualTo(a, Not(Not(c)))` => `EqualTo(a, c)`
-      // In addition, `if canSimplifyNot(b)` checks if the optimization can 
converge
-      // that avoids the situation two conditions are returning to each other.
-      case EqualTo(Not(a), b) if !canSimplifyNot(a) && canSimplifyNot(b) => 
EqualTo(a, Not(b))
-      case EqualTo(a, Not(b)) if canSimplifyNot(a) && !canSimplifyNot(b) => 
EqualTo(Not(a), b)
-      case EqualNullSafe(Not(a), b) if !canSimplifyNot(a) && canSimplifyNot(b) 
=>
-        EqualNullSafe(a, Not(b))
-      case EqualNullSafe(a, Not(b)) if canSimplifyNot(a) && !canSimplifyNot(b) 
=>
-        EqualNullSafe(Not(a), b)
-
-      // Push `Not` to one side of `EqualTo`/`EqualNullSafe` if it's 
beneficial.
-      // E.g. Not(EqualTo(x, false)) => EqualTo(x, true)
-      case Not(EqualTo(a, b)) if canSimplifyNot(b) => EqualTo(a, Not(b))
-      case Not(EqualTo(a, b)) if canSimplifyNot(a) => EqualTo(Not(a), b)
-      case Not(EqualNullSafe(a, b)) if !a.nullable && !b.nullable && 
canSimplifyNot(b) =>
-        EqualNullSafe(a, Not(b))
-      case Not(EqualNullSafe(a, b)) if !a.nullable && !b.nullable && 
canSimplifyNot(a) =>
-        EqualNullSafe(Not(a), b)
-    }
-  }
-}
-
-
-/**
  * Simplifies binary comparisons with semantically-equal expressions:
  * 1) Replace '<=>' with 'true' literal.
  * 2) Replace '=', '<=', and '>=' with 'true' literal if both operands are 
non-nullable.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index 66a6a89..935e51c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -116,7 +116,6 @@ object RuleIdCollection {
       "org.apache.spark.sql.catalyst.optimizer.LikeSimplification" ::
       "org.apache.spark.sql.catalyst.optimizer.LimitPushDown" ::
       "org.apache.spark.sql.catalyst.optimizer.LimitPushDownThroughWindow" ::
-      "org.apache.spark.sql.catalyst.optimizer.NotPropagation" ::
       "org.apache.spark.sql.catalyst.optimizer.NullDownPropagation" ::
       "org.apache.spark.sql.catalyst.optimizer.NullPropagation" ::
       "org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning" ::
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NotPropagationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NotPropagationSuite.scala
deleted file mode 100644
index d950609..0000000
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NotPropagationSuite.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.optimizer
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.PlanTest
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.types.BooleanType
-
-class NotPropagationSuite extends PlanTest with ExpressionEvalHelper {
-
-  object Optimize extends RuleExecutor[LogicalPlan] {
-    val batches =
-      Batch("AnalysisNodes", Once, EliminateSubqueryAliases) ::
-      Batch("Not Propagation", FixedPoint(50),
-        NullPropagation,
-        NullDownPropagation,
-        ConstantFolding,
-        SimplifyConditionals,
-        BooleanSimplification,
-        NotPropagation,
-        PruneFilters) :: Nil
-  }
-
-  val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.string,
-    'e.boolean, 'f.boolean, 'g.boolean, 'h.boolean)
-
-  val testRelationWithData = LocalRelation.fromExternalRows(
-    testRelation.output, Seq(Row(1, 2, 3, "abc"))
-  )
-
-  private def checkCondition(input: Expression, expected: LogicalPlan): Unit = 
{
-    val plan = testRelationWithData.where(input).analyze
-    val actual = Optimize.execute(plan)
-    comparePlans(actual, expected)
-  }
-
-  private def checkCondition(input: Expression, expected: Expression): Unit = {
-    val plan = testRelation.where(input).analyze
-    val actual = Optimize.execute(plan)
-    val correctAnswer = testRelation.where(expected).analyze
-    comparePlans(actual, correctAnswer)
-  }
-
-  test("Using (Not(a) === b) == (a === Not(b)), (Not(a) <=> b) == (a <=> 
Not(b)) rules") {
-    checkCondition(Not('e) === Literal(true), 'e === Literal(false))
-    checkCondition(Not('e) === Literal(false), 'e === Literal(true))
-    checkCondition(Not('e) === Literal(null, BooleanType), testRelation)
-    checkCondition(Literal(true) === Not('e), Literal(false) === 'e)
-    checkCondition(Literal(false) === Not('e), Literal(true) === 'e)
-    checkCondition(Literal(null, BooleanType) === Not('e), testRelation)
-    checkCondition(Not('e) <=> Literal(true), 'e <=> Literal(false))
-    checkCondition(Not('e) <=> Literal(false), 'e <=> Literal(true))
-    checkCondition(Not('e) <=> Literal(null, BooleanType), IsNull('e))
-    checkCondition(Literal(true) <=> Not('e), Literal(false) <=> 'e)
-    checkCondition(Literal(false) <=> Not('e), Literal(true) <=> 'e)
-    checkCondition(Literal(null, BooleanType) <=> Not('e), IsNull('e))
-
-    checkCondition(Not('e) === Not('f), 'e === 'f)
-    checkCondition(Not('e) <=> Not('f), 'e <=> 'f)
-
-    checkCondition(IsNull('e) === Not('f), IsNotNull('e) === 'f)
-    checkCondition(Not('e) === IsNull('f), 'e === IsNotNull('f))
-    checkCondition(IsNull('e) <=> Not('f), IsNotNull('e) <=> 'f)
-    checkCondition(Not('e) <=> IsNull('f), 'e <=> IsNotNull('f))
-
-    checkCondition(IsNotNull('e) === Not('f), IsNull('e) === 'f)
-    checkCondition(Not('e) === IsNotNull('f), 'e === IsNull('f))
-    checkCondition(IsNotNull('e) <=> Not('f), IsNull('e) <=> 'f)
-    checkCondition(Not('e) <=> IsNotNull('f), 'e <=> IsNull('f))
-
-    checkCondition(Not('e) === Not(And('f, 'g)), 'e === And('f, 'g))
-    checkCondition(Not(And('e, 'f)) === Not('g), And('e, 'f) === 'g)
-    checkCondition(Not('e) <=> Not(And('f, 'g)), 'e <=> And('f, 'g))
-    checkCondition(Not(And('e, 'f)) <=> Not('g), And('e, 'f) <=> 'g)
-
-    checkCondition(Not('e) === Not(Or('f, 'g)), 'e === Or('f, 'g))
-    checkCondition(Not(Or('e, 'f)) === Not('g), Or('e, 'f) === 'g)
-    checkCondition(Not('e) <=> Not(Or('f, 'g)), 'e <=> Or('f, 'g))
-    checkCondition(Not(Or('e, 'f)) <=> Not('g), Or('e, 'f) <=> 'g)
-
-    checkCondition(('a > 'b) === Not('f), ('a <= 'b) === 'f)
-    checkCondition(Not('e) === ('a > 'b), 'e === ('a <= 'b))
-    checkCondition(('a > 'b) <=> Not('f), ('a <= 'b) <=> 'f)
-    checkCondition(Not('e) <=> ('a > 'b), 'e <=> ('a <= 'b))
-
-    checkCondition(('a >= 'b) === Not('f), ('a < 'b) === 'f)
-    checkCondition(Not('e) === ('a >= 'b), 'e === ('a < 'b))
-    checkCondition(('a >= 'b) <=> Not('f), ('a < 'b) <=> 'f)
-    checkCondition(Not('e) <=> ('a >= 'b), 'e <=> ('a < 'b))
-
-    checkCondition(('a < 'b) === Not('f), ('a >= 'b) === 'f)
-    checkCondition(Not('e) === ('a < 'b), 'e === ('a >= 'b))
-    checkCondition(('a < 'b) <=> Not('f), ('a >= 'b) <=> 'f)
-    checkCondition(Not('e) <=> ('a < 'b), 'e <=> ('a >= 'b))
-
-    checkCondition(('a <= 'b) === Not('f), ('a > 'b) === 'f)
-    checkCondition(Not('e) === ('a <= 'b), 'e === ('a > 'b))
-    checkCondition(('a <= 'b) <=> Not('f), ('a > 'b) <=> 'f)
-    checkCondition(Not('e) <=> ('a <= 'b), 'e <=> ('a > 'b))
-  }
-
-  test("Using (a =!= b) == (a === Not(b)), Not(a <=> b) == (a <=> Not(b)) 
rules") {
-    checkCondition('e =!= Literal(true), 'e === Literal(false))
-    checkCondition('e =!= Literal(false), 'e === Literal(true))
-    checkCondition('e =!= Literal(null, BooleanType), testRelation)
-    checkCondition(Literal(true) =!= 'e, Literal(false) === 'e)
-    checkCondition(Literal(false) =!= 'e, Literal(true) === 'e)
-    checkCondition(Literal(null, BooleanType) =!= 'e, testRelation)
-    checkCondition(Not(('a <=> 'b) <=> Literal(true)), ('a <=> 'b) <=> 
Literal(false))
-    checkCondition(Not(('a <=> 'b) <=> Literal(false)), ('a <=> 'b) <=> 
Literal(true))
-    checkCondition(Not(('a <=> 'b) <=> Literal(null, BooleanType)), 
testRelationWithData)
-    checkCondition(Not(Literal(true) <=> ('a <=> 'b)), Literal(false) <=> ('a 
<=> 'b))
-    checkCondition(Not(Literal(false) <=> ('a <=> 'b)), Literal(true) <=> ('a 
<=> 'b))
-    checkCondition(Not(Literal(null, BooleanType) <=> IsNull('e)), 
testRelationWithData)
-
-    checkCondition('e =!= Not('f), 'e === 'f)
-    checkCondition(Not('e) =!= 'f, 'e === 'f)
-    checkCondition(Not(('a <=> 'b) <=> Not(('b <=> 'c))), ('a <=> 'b) <=> ('b 
<=> 'c))
-    checkCondition(Not(Not(('a <=> 'b)) <=> ('b <=> 'c)), ('a <=> 'b) <=> ('b 
<=> 'c))
-
-    checkCondition('e =!= IsNull('f), 'e === IsNotNull('f))
-    checkCondition(IsNull('e) =!= 'f, IsNotNull('e) === 'f)
-    checkCondition(Not(('a <=> 'b) <=> IsNull('f)), ('a <=> 'b) <=> 
IsNotNull('f))
-    checkCondition(Not(IsNull('e) <=> ('b <=> 'c)), IsNotNull('e) <=> ('b <=> 
'c))
-
-    checkCondition('e =!= IsNotNull('f), 'e === IsNull('f))
-    checkCondition(IsNotNull('e) =!= 'f, IsNull('e) === 'f)
-    checkCondition(Not(('a <=> 'b) <=> IsNotNull('f)), ('a <=> 'b) <=> 
IsNull('f))
-    checkCondition(Not(IsNotNull('e) <=> ('b <=> 'c)), IsNull('e) <=> ('b <=> 
'c))
-
-    checkCondition('e =!= Not(And('f, 'g)), 'e === And('f, 'g))
-    checkCondition(Not(And('e, 'f)) =!= 'g, And('e, 'f) === 'g)
-    checkCondition('e =!= Not(Or('f, 'g)), 'e === Or('f, 'g))
-    checkCondition(Not(Or('e, 'f)) =!= 'g, Or('e, 'f) === 'g)
-
-    checkCondition(('a > 'b) =!= 'f, ('a <= 'b) === 'f)
-    checkCondition('e =!= ('a > 'b), 'e === ('a <= 'b))
-    checkCondition(('a >= 'b) =!= 'f, ('a < 'b) === 'f)
-    checkCondition('e =!= ('a >= 'b), 'e === ('a < 'b))
-    checkCondition(('a < 'b) =!= 'f, ('a >= 'b) === 'f)
-    checkCondition('e =!= ('a < 'b), 'e === ('a >= 'b))
-    checkCondition(('a <= 'b) =!= 'f, ('a > 'b) === 'f)
-    checkCondition('e =!= ('a <= 'b), 'e === ('a > 'b))
-
-    checkCondition('e =!= ('f === ('g === Not('h))), 'e === ('f === ('g === 
'h)))
-
-  }
-
-  test("Properly avoid non optimize-able cases") {
-    checkCondition(Not(('a > 'b) <=> 'f), Not(('a > 'b) <=> 'f))
-    checkCondition(Not('e <=> ('a > 'b)), Not('e <=> ('a > 'b)))
-    checkCondition(('a === 'b) =!= ('a === 'c), ('a === 'b) =!= ('a === 'c))
-    checkCondition(('a === 'b) =!= ('c in(1, 2, 3)), ('a === 'b) =!= ('c in(1, 
2, 3)))
-  }
-}
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullDownPropagationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullDownPropagationSuite.scala
index c9d1f33..7097ebd 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullDownPropagationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullDownPropagationSuite.scala
@@ -36,7 +36,6 @@ class NullDownPropagationSuite extends PlanTest with 
ExpressionEvalHelper {
         ConstantFolding,
         SimplifyConditionals,
         BooleanSimplification,
-        NotPropagation,
         PruneFilters) :: Nil
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index a376c9c..89157be 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -1956,4 +1956,30 @@ class SubquerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
     assert(!nonDeterministicQueryPlan.deterministic)
   }
 
+  test("SPARK-38132: Not IN subquery correctness checks") {
+    val t = "test_table"
+    withTable(t) {
+      Seq[(Integer, Integer)](
+        (1, 1),
+        (2, 2),
+        (3, 3),
+        (4, null),
+        (null, 0))
+        .toDF("c1", "c2").write.saveAsTable(t)
+      val df = spark.table(t)
+
+      checkAnswer(df.where(s"(c1 NOT IN (SELECT c2 FROM $t)) = true"), 
Seq.empty)
+      checkAnswer(df.where(s"(c1 NOT IN (SELECT c2 FROM $t WHERE c2 IS NOT 
NULL)) = true"),
+        Row(4, null) :: Nil)
+      checkAnswer(df.where(s"(c1 NOT IN (SELECT c2 FROM $t)) <=> true"), 
Seq.empty)
+      checkAnswer(df.where(s"(c1 NOT IN (SELECT c2 FROM $t WHERE c2 IS NOT 
NULL)) <=> true"),
+        Row(4, null) :: Nil)
+      checkAnswer(df.where(s"(c1 NOT IN (SELECT c2 FROM $t)) != false"), 
Seq.empty)
+      checkAnswer(df.where(s"(c1 NOT IN (SELECT c2 FROM $t WHERE c2 IS NOT 
NULL)) != false"),
+        Row(4, null) :: Nil)
+      checkAnswer(df.where(s"NOT((c1 NOT IN (SELECT c2 FROM $t)) <=> false)"), 
Seq.empty)
+      checkAnswer(df.where(s"NOT((c1 NOT IN (SELECT c2 FROM $t WHERE c2 IS NOT 
NULL)) <=> false)"),
+        Row(4, null) :: Nil)
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to