Repository: spark
Updated Branches:
  refs/heads/master c9d7d83ed -> 368513048


[SPARK-25690][SQL] Analyzer rule HandleNullInputsForUDF does not stabilize and 
can be applied infinitely

## What changes were proposed in this pull request?

The HandleNullInputsForUDF rule can generate new If node infinitely, thus 
causing problems like match of SQL cache missed.
This was fixed in SPARK-24891 and was then broken by SPARK-25044.
The unit test in `AnalysisSuite` added in SPARK-24891 should have failed but 
didn't because it wasn't properly updated after the `ScalaUDF` constructor 
signature change. So this PR also updates the test accordingly based on the new 
`ScalaUDF` constructor.

## How was this patch tested?

Updated the original UT. This should be justified as the original UT became 
invalid after SPARK-25044.

Closes #22701 from maryannxue/spark-25690.

Authored-by: maryannxue <maryann...@apache.org>
Signed-off-by: gatorsmile <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36851304
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36851304
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36851304

Branch: refs/heads/master
Commit: 368513048198efcee8c9a35678b608be0cb9ad48
Parents: c9d7d83
Author: maryannxue <maryann...@apache.org>
Authored: Thu Oct 11 20:45:08 2018 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Thu Oct 11 20:45:08 2018 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala  | 4 +++-
 .../org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala   | 4 ++--
 2 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/36851304/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d72e512..7f641ac 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2150,8 +2150,10 @@ class Analyzer(
 
             // TODO: skip null handling for not-nullable primitive inputs 
after we can completely
             // trust the `nullable` information.
+            val needsNullCheck = (nullable: Boolean, expr: Expression) =>
+              nullable && !expr.isInstanceOf[KnownNotNull]
             val inputsNullCheck = nullableTypes.zip(inputs)
-              .filter { case (nullable, _) => !nullable }
+              .filter { case (nullableType, expr) => 
needsNullCheck(!nullableType, expr) }
               .map { case (_, expr) => IsNull(expr) }
               .reduceLeftOption[Expression]((e1, e2) => Or(e1, e2))
             // Once we add an `If` check above the udf, it is safe to mark 
those checked inputs

http://git-wip-us.apache.org/repos/asf/spark/blob/36851304/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index f9facbb..cf76c92 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -351,8 +351,8 @@ class AnalysisSuite extends AnalysisTest with Matchers {
   test("SPARK-24891 Fix HandleNullInputsForUDF rule") {
     val a = testRelation.output(0)
     val func = (x: Int, y: Int) => x + y
-    val udf1 = ScalaUDF(func, IntegerType, a :: a :: Nil)
-    val udf2 = ScalaUDF(func, IntegerType, a :: udf1 :: Nil)
+    val udf1 = ScalaUDF(func, IntegerType, a :: a :: Nil, nullableTypes = 
false :: false :: Nil)
+    val udf2 = ScalaUDF(func, IntegerType, a :: udf1 :: Nil, nullableTypes = 
false :: false :: Nil)
     val plan = Project(Alias(udf2, "")() :: Nil, testRelation)
     comparePlans(plan.analyze, plan.analyze.analyze)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to