This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new d9d3bea  Revert "[SPARK-26366][SQL][BACKPORT-2.3] 
ReplaceExceptWithFilter should consider NULL as False"
d9d3bea is described below

commit d9d3beafad8dbee5d21f062a181343b8640d2ccd
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Fri Dec 21 19:57:07 2018 -0800

    Revert "[SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter should 
consider NULL as False"
    
    This reverts commit a7d50ae24a5f92e8d9b6622436f0bb4c2e06cbe1.
---
 .../optimizer/ReplaceExceptWithFilter.scala        | 32 +++++++---------
 .../catalyst/optimizer/ReplaceOperatorSuite.scala  | 44 ++++++----------------
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 11 ------
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 38 -------------------
 4 files changed, 24 insertions(+), 101 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
index 08cf160..45edf26 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
@@ -36,8 +36,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
  * Note:
  * Before flipping the filter condition of the right node, we should:
  * 1. Combine all it's [[Filter]].
- * 2. Update the attribute references to the left node;
- * 3. Add a Coalesce(condition, False) (to take into account of NULL values in 
the condition).
+ * 2. Apply InferFiltersFromConstraints rule (to take into account of NULL 
values in the condition).
  */
 object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
 
@@ -48,28 +47,23 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
 
     plan.transform {
       case e @ Except(left, right) if isEligible(left, right) =>
-        val filterCondition = 
combineFilters(skipProject(right)).asInstanceOf[Filter].condition
-        if (filterCondition.deterministic) {
-          transformCondition(left, filterCondition).map { c =>
-            Distinct(Filter(Not(c), left))
-          }.getOrElse {
-            e
-          }
-        } else {
+        val newCondition = transformCondition(left, skipProject(right))
+        newCondition.map { c =>
+          Distinct(Filter(Not(c), left))
+        }.getOrElse {
           e
         }
     }
   }
 
-  private def transformCondition(plan: LogicalPlan, condition: Expression): 
Option[Expression] = {
-    val attributeNameMap: Map[String, Attribute] = plan.output.map(x => 
(x.name, x)).toMap
-    if (condition.references.forall(r => attributeNameMap.contains(r.name))) {
-      val rewrittenCondition = condition.transform {
-        case a: AttributeReference => attributeNameMap(a.name)
-      }
-      // We need to consider as False when the condition is NULL, otherwise we 
do not return those
-      // rows containing NULL which are instead filtered in the Except right 
plan
-      Some(Coalesce(Seq(rewrittenCondition, Literal.FalseLiteral)))
+  private def transformCondition(left: LogicalPlan, right: LogicalPlan): 
Option[Expression] = {
+    val filterCondition =
+      
InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition
+
+    val attributeNameMap: Map[String, Attribute] = left.output.map(x => 
(x.name, x)).toMap
+
+    if (filterCondition.references.forall(r => 
attributeNameMap.contains(r.name))) {
+      Some(filterCondition.transform { case a: AttributeReference => 
attributeNameMap(a.name) })
     } else {
       None
     }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
index 78d3969..52dc2e9 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
@@ -20,12 +20,11 @@ package org.apache.spark.sql.catalyst.optimizer
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Alias, Coalesce, If, 
Literal, Not}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, Not}
 import org.apache.spark.sql.catalyst.expressions.aggregate.First
 import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.types.BooleanType
 
 class ReplaceOperatorSuite extends PlanTest {
 
@@ -66,7 +65,8 @@ class ReplaceOperatorSuite extends PlanTest {
 
     val correctAnswer =
       Aggregate(table1.output, table1.output,
-        Filter(Not(Coalesce(Seq(attributeA >= 2 && attributeB < 1, 
Literal.FalseLiteral))),
+        Filter(Not((attributeA.isNotNull && attributeB.isNotNull) &&
+          (attributeA >= 2 && attributeB < 1)),
           Filter(attributeB === 2, Filter(attributeA === 1, table1)))).analyze
 
     comparePlans(optimized, correctAnswer)
@@ -84,8 +84,8 @@ class ReplaceOperatorSuite extends PlanTest {
 
     val correctAnswer =
       Aggregate(table1.output, table1.output,
-        Filter(Not(Coalesce(Seq(attributeA >= 2 && attributeB < 1, 
Literal.FalseLiteral))),
-          table1)).analyze
+        Filter(Not((attributeA.isNotNull && attributeB.isNotNull) &&
+          (attributeA >= 2 && attributeB < 1)), table1)).analyze
 
     comparePlans(optimized, correctAnswer)
   }
@@ -104,7 +104,8 @@ class ReplaceOperatorSuite extends PlanTest {
 
     val correctAnswer =
       Aggregate(table1.output, table1.output,
-        Filter(Not(Coalesce(Seq(attributeA >= 2 && attributeB < 1, 
Literal.FalseLiteral))),
+        Filter(Not((attributeA.isNotNull && attributeB.isNotNull) &&
+          (attributeA >= 2 && attributeB < 1)),
           Project(Seq(attributeA, attributeB), table1))).analyze
 
     comparePlans(optimized, correctAnswer)
@@ -124,7 +125,8 @@ class ReplaceOperatorSuite extends PlanTest {
 
     val correctAnswer =
       Aggregate(table1.output, table1.output,
-          Filter(Not(Coalesce(Seq(attributeA >= 2 && attributeB < 1, 
Literal.FalseLiteral))),
+          Filter(Not((attributeA.isNotNull && attributeB.isNotNull) &&
+            (attributeA >= 2 && attributeB < 1)),
             Filter(attributeB === 2, Filter(attributeA === 1, 
table1)))).analyze
 
     comparePlans(optimized, correctAnswer)
@@ -144,7 +146,8 @@ class ReplaceOperatorSuite extends PlanTest {
 
     val correctAnswer =
       Aggregate(table1.output, table1.output,
-        Filter(Not(Coalesce(Seq(attributeA === 1 && attributeB === 2, 
Literal.FalseLiteral))),
+        Filter(Not((attributeA.isNotNull && attributeB.isNotNull) &&
+          (attributeA === 1 && attributeB === 2)),
           Project(Seq(attributeA, attributeB),
             Filter(attributeB < 1, Filter(attributeA >= 2, table1))))).analyze
 
@@ -226,29 +229,4 @@ class ReplaceOperatorSuite extends PlanTest {
 
     comparePlans(optimized, query)
   }
-
-  test("SPARK-26366: ReplaceExceptWithFilter should handle properly NULL") {
-    val basePlan = LocalRelation(Seq('a.int, 'b.int))
-    val otherPlan = basePlan.where('a.in(1, 2) || 'b.in())
-    val except = Except(basePlan, otherPlan)
-    val result = OptimizeIn(Optimize.execute(except.analyze))
-    val correctAnswer = Aggregate(basePlan.output, basePlan.output,
-      Filter(!Coalesce(Seq(
-        'a.in(1, 2) || If('b.isNotNull, Literal.FalseLiteral, Literal(null, 
BooleanType)),
-        Literal.FalseLiteral)),
-        basePlan)).analyze
-    comparePlans(result, correctAnswer)
-  }
-
-  test("SPARK-26366: ReplaceExceptWithFilter should not transform 
non-detrministic") {
-    val basePlan = LocalRelation(Seq('a.int, 'b.int))
-    val otherPlan = basePlan.where('a > rand(1L))
-    val except = Except(basePlan, otherPlan)
-    val result = Optimize.execute(except.analyze)
-    val condition = basePlan.output.zip(otherPlan.output).map { case (a1, a2) 
=>
-      a1 <=> a2 }.reduce( _ && _)
-    val correctAnswer = Aggregate(basePlan.output, otherPlan.output,
-      Join(basePlan, otherPlan, LeftAnti, Option(condition))).analyze
-    comparePlans(result, correctAnswer)
-  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 522ed8d..3b7bd84 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1467,17 +1467,6 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
     checkAnswer(df.groupBy(col("a")).agg(first(col("b"))),
       Seq(Row("0", BigDecimal.valueOf(0.1111)), Row("1", 
BigDecimal.valueOf(1.1111))))
   }
-
-  test("SPARK-26366: return nulls which are not filtered in except") {
-    val inputDF = sqlContext.createDataFrame(
-      sparkContext.parallelize(Seq(Row("0", "a"), Row("1", null))),
-      StructType(Seq(
-        StructField("a", StringType, nullable = true),
-        StructField("b", StringType, nullable = true))))
-
-    val exceptDF = inputDF.filter(col("a").isin("0") or col("b") > "c")
-    checkAnswer(inputDF.except(exceptDF), Seq(Row("1", null)))
-  }
 }
 
 case class TestDataUnion(x: Int, y: Int, z: Int)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 6848b66..0af6d87 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2831,44 +2831,6 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
       checkAnswer(sql("select 26393499451 / (1e6 * 1000)"), 
Row(BigDecimal("26.3934994510000")))
     }
   }
-
-  test("SPARK-26366: verify ReplaceExceptWithFilter") {
-    Seq(true, false).foreach { enabled =>
-      withSQLConf(SQLConf.REPLACE_EXCEPT_WITH_FILTER.key -> enabled.toString) {
-        val df = spark.createDataFrame(
-          sparkContext.parallelize(Seq(Row(0, 3, 5),
-            Row(0, 3, null),
-            Row(null, 3, 5),
-            Row(0, null, 5),
-            Row(0, null, null),
-            Row(null, null, 5),
-            Row(null, 3, null),
-            Row(null, null, null))),
-          StructType(Seq(StructField("c1", IntegerType),
-            StructField("c2", IntegerType),
-            StructField("c3", IntegerType))))
-        val where = "c2 >= 3 OR c1 >= 0"
-        val whereNullSafe =
-          """
-            |(c2 IS NOT NULL AND c2 >= 3)
-            |OR (c1 IS NOT NULL AND c1 >= 0)
-          """.stripMargin
-
-        val df_a = df.filter(where)
-        val df_b = df.filter(whereNullSafe)
-        checkAnswer(df.except(df_a), df.except(df_b))
-
-        val whereWithIn = "c2 >= 3 OR c1 in (2)"
-        val whereWithInNullSafe =
-          """
-            |(c2 IS NOT NULL AND c2 >= 3)
-          """.stripMargin
-        val dfIn_a = df.filter(whereWithIn)
-        val dfIn_b = df.filter(whereWithInNullSafe)
-        checkAnswer(df.except(dfIn_a), df.except(dfIn_b))
-      }
-    }
-  }
 }
 
 case class Foo(bar: Option[String])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to