Repository: spark
Updated Branches:
  refs/heads/branch-2.3 f36bdb401 -> 28c9adbd6


[SPARK-23802][SQL] PropagateEmptyRelation can leave query plan in unresolved 
state

## What changes were proposed in this pull request?

Add cast to nulls introduced by PropagateEmptyRelation so in cases they're part 
of coalesce they will not break its type checking rules

## How was this patch tested?

Added unit test

Author: Robert Kruszewski <robe...@palantir.com>

Closes #20914 from robert3005/rk/propagate-empty-fix.

(cherry picked from commit 5cfd5fabcdbd77a806b98a6dd59b02772d2f6dee)
Signed-off-by: gatorsmile <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28c9adbd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28c9adbd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28c9adbd

Branch: refs/heads/branch-2.3
Commit: 28c9adbd6537c1545cc2b448c8908444ca858c44
Parents: f36bdb4
Author: Robert Kruszewski <robe...@palantir.com>
Authored: Tue Apr 3 17:25:54 2018 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Tue Apr 3 17:26:06 2018 -0700

----------------------------------------------------------------------
 .../optimizer/PropagateEmptyRelation.scala      |  8 ++++--
 .../optimizer/PropagateEmptyRelationSuite.scala | 26 ++++++++++++++------
 2 files changed, 25 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/28c9adbd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
index a6e5aa6..c3fdb92 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import org.apache.spark.sql.catalyst.analysis.CastSupport
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.internal.SQLConf
 
 /**
  * Collapse plans consisting empty local relations generated by 
[[PruneFilters]].
@@ -32,7 +34,7 @@ import org.apache.spark.sql.catalyst.rules._
  *    - Aggregate with all empty children and at least one grouping expression.
  *    - Generate(Explode) with all empty children. Others like Hive UDTF may 
return results.
  */
-object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
+object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper 
with CastSupport {
   private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match {
     case p: LocalRelation => p.data.isEmpty
     case _ => false
@@ -43,7 +45,9 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with 
PredicateHelper {
 
   // Construct a project list from plan's output, while the value is always 
NULL.
   private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] =
-    plan.output.map{ a => Alias(Literal(null), a.name)(a.exprId) }
+    plan.output.map{ a => Alias(cast(Literal(null), a.dataType), 
a.name)(a.exprId) }
+
+  override def conf: SQLConf = SQLConf.get
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
     case p: Union if p.children.forall(isEmptyLocalRelation) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/28c9adbd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
index 3964508..f1ce754 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{IntegerType, StructType}
 
 class PropagateEmptyRelationSuite extends PlanTest {
   object Optimize extends RuleExecutor[LogicalPlan] {
@@ -37,7 +37,8 @@ class PropagateEmptyRelationSuite extends PlanTest {
         ReplaceIntersectWithSemiJoin,
         PushDownPredicate,
         PruneFilters,
-        PropagateEmptyRelation) :: Nil
+        PropagateEmptyRelation,
+        CollapseProject) :: Nil
   }
 
   object OptimizeWithoutPropagateEmptyRelation extends 
RuleExecutor[LogicalPlan] {
@@ -48,7 +49,8 @@ class PropagateEmptyRelationSuite extends PlanTest {
         ReplaceExceptWithAntiJoin,
         ReplaceIntersectWithSemiJoin,
         PushDownPredicate,
-        PruneFilters) :: Nil
+        PruneFilters,
+        CollapseProject) :: Nil
   }
 
   val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = 
Seq(Row(1)))
@@ -79,9 +81,11 @@ class PropagateEmptyRelationSuite extends PlanTest {
 
       (true, false, Inner, Some(LocalRelation('a.int, 'b.int))),
       (true, false, Cross, Some(LocalRelation('a.int, 'b.int))),
-      (true, false, LeftOuter, Some(Project(Seq('a, Literal(null).as('b)), 
testRelation1).analyze)),
+      (true, false, LeftOuter,
+        Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), 
testRelation1).analyze)),
       (true, false, RightOuter, Some(LocalRelation('a.int, 'b.int))),
-      (true, false, FullOuter, Some(Project(Seq('a, Literal(null).as('b)), 
testRelation1).analyze)),
+      (true, false, FullOuter,
+        Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), 
testRelation1).analyze)),
       (true, false, LeftAnti, Some(testRelation1)),
       (true, false, LeftSemi, Some(LocalRelation('a.int))),
 
@@ -89,8 +93,9 @@ class PropagateEmptyRelationSuite extends PlanTest {
       (false, true, Cross, Some(LocalRelation('a.int, 'b.int))),
       (false, true, LeftOuter, Some(LocalRelation('a.int, 'b.int))),
       (false, true, RightOuter,
-        Some(Project(Seq(Literal(null).as('a), 'b), testRelation2).analyze)),
-      (false, true, FullOuter, Some(Project(Seq(Literal(null).as('a), 'b), 
testRelation2).analyze)),
+        Some(Project(Seq(Literal(null).cast(IntegerType).as('a), 'b), 
testRelation2).analyze)),
+      (false, true, FullOuter,
+        Some(Project(Seq(Literal(null).cast(IntegerType).as('a), 'b), 
testRelation2).analyze)),
       (false, true, LeftAnti, Some(LocalRelation('a.int))),
       (false, true, LeftSemi, Some(LocalRelation('a.int))),
 
@@ -209,4 +214,11 @@ class PropagateEmptyRelationSuite extends PlanTest {
 
     comparePlans(optimized, correctAnswer)
   }
+
+  test("propagate empty relation keeps the plan resolved") {
+    val query = testRelation1.join(
+      LocalRelation('a.int, 'b.int), UsingJoin(FullOuter, "a" :: Nil), None)
+    val optimized = Optimize.execute(query.analyze)
+    assert(optimized.resolved)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to