Repository: spark
Updated Branches:
  refs/heads/branch-2.0 0e0d83a59 -> 00a2e01e8


[SPARK-18058][SQL] [BRANCH-2.0]Comparing column types ignoring Nullability in 
Union and SetOperation

## What changes were proposed in this pull request?

The PR tries to fix 
[SPARK-18058](https://issues.apache.org/jira/browse/SPARK-18058) which refers 
to a bug that the column types are compared with the extra care about 
Nullability in Union and SetOperation.

This PR converts the columns types by setting all fields as nullable before 
comparison

## How was this patch tested?

regular unit test cases

Author: CodingCat <[email protected]>

Closes #15602 from CodingCat/branch-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00a2e01e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00a2e01e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00a2e01e

Branch: refs/heads/branch-2.0
Commit: 00a2e01e8d5afde21cc01476adf0215a316ae936
Parents: 0e0d83a
Author: CodingCat <[email protected]>
Authored: Sun Oct 23 22:20:18 2016 +0200
Committer: Herman van Hovell <[email protected]>
Committed: Sun Oct 23 22:20:18 2016 +0200

----------------------------------------------------------------------
 .../plans/logical/basicLogicalOperators.scala   | 29 +++++++-------------
 .../sql/catalyst/analysis/AnalysisSuite.scala   | 14 ++++++++++
 2 files changed, 24 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/00a2e01e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 07e39b0..2fdaa05 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -116,6 +116,8 @@ case class Filter(condition: Expression, child: LogicalPlan)
 
 abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends 
BinaryNode {
 
+  def duplicateResolved: Boolean = 
left.outputSet.intersect(right.outputSet).isEmpty
+
   protected def leftConstraints: Set[Expression] = left.constraints
 
   protected def rightConstraints: Set[Expression] = {
@@ -125,6 +127,13 @@ abstract class SetOperation(left: LogicalPlan, right: 
LogicalPlan) extends Binar
       case a: Attribute => attributeRewrites(a)
     })
   }
+
+  override lazy val resolved: Boolean =
+    childrenResolved &&
+      left.output.length == right.output.length &&
+      left.output.zip(right.output).forall {
+        case (l, r) => l.dataType.asNullable == r.dataType.asNullable } &&
+      duplicateResolved
 }
 
 object SetOperation {
@@ -133,8 +142,6 @@ object SetOperation {
 
 case class Intersect(left: LogicalPlan, right: LogicalPlan) extends 
SetOperation(left, right) {
 
-  def duplicateResolved: Boolean = 
left.outputSet.intersect(right.outputSet).isEmpty
-
   override def output: Seq[Attribute] =
     left.output.zip(right.output).map { case (leftAttr, rightAttr) =>
       leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable)
@@ -143,14 +150,6 @@ case class Intersect(left: LogicalPlan, right: 
LogicalPlan) extends SetOperation
   override protected def validConstraints: Set[Expression] =
     leftConstraints.union(rightConstraints)
 
-  // Intersect are only resolved if they don't introduce ambiguous expression 
ids,
-  // since the Optimizer will convert Intersect to Join.
-  override lazy val resolved: Boolean =
-    childrenResolved &&
-      left.output.length == right.output.length &&
-      left.output.zip(right.output).forall { case (l, r) => l.dataType == 
r.dataType } &&
-      duplicateResolved
-
   override def maxRows: Option[Long] = {
     if (children.exists(_.maxRows.isEmpty)) {
       None
@@ -171,19 +170,11 @@ case class Intersect(left: LogicalPlan, right: 
LogicalPlan) extends SetOperation
 
 case class Except(left: LogicalPlan, right: LogicalPlan) extends 
SetOperation(left, right) {
 
-  def duplicateResolved: Boolean = 
left.outputSet.intersect(right.outputSet).isEmpty
-
   /** We don't use right.output because those rows get excluded from the set. 
*/
   override def output: Seq[Attribute] = left.output
 
   override protected def validConstraints: Set[Expression] = leftConstraints
 
-  override lazy val resolved: Boolean =
-    childrenResolved &&
-      left.output.length == right.output.length &&
-      left.output.zip(right.output).forall { case (l, r) => l.dataType == 
r.dataType } &&
-      duplicateResolved
-
   override lazy val statistics: Statistics = {
     left.statistics.copy()
   }
@@ -218,7 +209,7 @@ case class Union(children: Seq[LogicalPlan]) extends 
LogicalPlan {
         child.output.length == children.head.output.length &&
         // compare the data types with the first child
         child.output.zip(children.head.output).forall {
-          case (l, r) => l.dataType == r.dataType }
+          case (l, r) => l.dataType.asNullable == r.dataType.asNullable }
       )
 
     children.length > 1 && childrenResolved && allChildrenCompatible

http://git-wip-us.apache.org/repos/asf/spark/blob/00a2e01e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 102c78b..1ba4f4a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -377,4 +377,18 @@ class AnalysisSuite extends AnalysisTest {
     assertExpressionType(sum(Divide(Decimal(1), 2.0)), DoubleType)
     assertExpressionType(sum(Divide(1.0, Decimal(2.0))), DoubleType)
   }
+
+  test("SPARK-18058: union and set operations shall not care about the 
nullability" +
+    " when comparing column types") {
+    val firstTable = LocalRelation(
+      AttributeReference("a",
+        StructType(Seq(StructField("a", IntegerType, nullable = true))), 
nullable = false)())
+    val secondTable = LocalRelation(
+      AttributeReference("a",
+        StructType(Seq(StructField("a", IntegerType, nullable = false))), 
nullable = false)())
+
+    assertAnalysisSuccess(Union(firstTable, secondTable))
+    assertAnalysisSuccess(Except(firstTable, secondTable))
+    assertAnalysisSuccess(Intersect(firstTable, secondTable))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to