Repository: spark
Updated Branches:
  refs/heads/branch-2.0 fef3ec151 -> c02bc926d


[SPARK-17100] [SQL] fix Python udf in filter on top of outer join

## What changes were proposed in this pull request?

In optimizer, we try to evaluate the condition to see whether it's nullable or 
not, but some expressions are not evaluable, we should check that before 
evaluate it.

## How was this patch tested?

Added regression tests.

Author: Davies Liu <dav...@databricks.com>

Closes #15103 from davies/udf_join.

(cherry picked from commit d8104158a922d86dd4f00e50d5d7dddc7b777a21)
Signed-off-by: Davies Liu <davies....@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c02bc926
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c02bc926
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c02bc926

Branch: refs/heads/branch-2.0
Commit: c02bc926d71c406109870740dcdba68785a2c5d2
Parents: fef3ec1
Author: Davies Liu <dav...@databricks.com>
Authored: Mon Sep 19 13:24:16 2016 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Mon Sep 19 13:24:25 2016 -0700

----------------------------------------------------------------------
 python/pyspark/sql/tests.py                                  | 8 ++++++++
 .../org/apache/spark/sql/catalyst/optimizer/joins.scala      | 4 +++-
 2 files changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c02bc926/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index d3634fc..1ec40ce 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -323,6 +323,14 @@ class SQLTests(ReusedPySparkTestCase):
         [row] = self.spark.sql("SELECT double(add(1, 2)), add(double(2), 
1)").collect()
         self.assertEqual(tuple(row), (6, 5))
 
+    def test_udf_in_filter_on_top_of_outer_join(self):
+        from pyspark.sql.functions import udf
+        left = self.spark.createDataFrame([Row(a=1)])
+        right = self.spark.createDataFrame([Row(a=1)])
+        df = left.join(right, on='a', how='left_outer')
+        df = df.withColumn('b', udf(lambda x: 'x')(df.a))
+        self.assertEqual(df.filter('b = "x"').collect(), [Row(a=1, b='x')])
+
     def test_udf_without_arguments(self):
         self.spark.catalog.registerFunction("foo", lambda: "bar")
         [row] = self.spark.sql("SELECT foo()").collect()

http://git-wip-us.apache.org/repos/asf/spark/blob/c02bc926/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index 158ad3d..ae4cd8e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -104,7 +104,9 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with 
PredicateHelper {
     if (!e.deterministic || SubqueryExpression.hasCorrelatedSubquery(e)) 
return false
     val attributes = e.references.toSeq
     val emptyRow = new GenericInternalRow(attributes.length)
-    val v = BindReferences.bindReference(e, attributes).eval(emptyRow)
+    val boundE = BindReferences.bindReference(e, attributes)
+    if (boundE.find(_.isInstanceOf[Unevaluable]).isDefined) return false
+    val v = boundE.eval(emptyRow)
     v == null || v == false
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to