Repository: spark
Updated Branches:
  refs/heads/branch-1.3 04b207815 -> 6f10142de


[SPARK-6452] [SQL] Checks for missing attributes and unresolved operator for 
all types of operator

In `CheckAnalysis`, `Filter` and `Aggregate` are checked in separate case 
clauses, thus never hit those clauses for unresolved operators and missing 
input attributes.

This PR also removes the `prettyString` call when generating error message for 
missing input attributes. Because result of `prettyString` doesn't contain 
expression ID, and may give confusing messages like

> resolved attributes a missing from a

cc rxin

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png"; height=40 alt="Review on 
Reviewable"/>](https://reviewable.io/reviews/apache/spark/5129)
<!-- Reviewable:end -->

Author: Cheng Lian <[email protected]>

Closes #5129 from liancheng/spark-6452 and squashes the following commits:

52cdc69 [Cheng Lian] Addresses comments
029f9bd [Cheng Lian] Checks for missing attributes and unresolved operator for 
all types of operator

(cherry picked from commit 1afcf773d0cafdfd9bf106fdc5c429ed2ba3dd36)
Signed-off-by: Michael Armbrust <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f10142d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f10142d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f10142d

Branch: refs/heads/branch-1.3
Commit: 6f10142de87c8f3725e2a306162ee143fc4ba851
Parents: 04b2078
Author: Cheng Lian <[email protected]>
Authored: Tue Mar 24 01:12:11 2015 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Tue Mar 24 01:12:22 2015 -0700

----------------------------------------------------------------------
 .../sql/catalyst/analysis/CheckAnalysis.scala     | 15 ++++++++++-----
 .../spark/sql/catalyst/plans/QueryPlan.scala      |  7 +++++--
 .../sql/catalyst/analysis/AnalysisSuite.scala     | 18 ++++++++++++++++++
 3 files changed, 33 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6f10142d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index fb975ee..425e1e4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -63,7 +63,7 @@ class CheckAnalysis {
               s"filter expression '${f.condition.prettyString}' " +
                 s"of type ${f.condition.dataType.simpleString} is not a 
boolean.")
 
-          case aggregatePlan@Aggregate(groupingExprs, aggregateExprs, child) =>
+          case Aggregate(groupingExprs, aggregateExprs, child) =>
             def checkValidAggregateExpression(expr: Expression): Unit = expr 
match {
               case _: AggregateExpression => // OK
               case e: Attribute if !groupingExprs.contains(e) =>
@@ -85,13 +85,18 @@ class CheckAnalysis {
 
             cleaned.foreach(checkValidAggregateExpression)
 
+          case _ => // Fallbacks to the following checks
+        }
+
+        operator match {
           case o if o.children.nonEmpty && o.missingInput.nonEmpty =>
-            val missingAttributes = 
o.missingInput.map(_.prettyString).mkString(",")
-            val input = o.inputSet.map(_.prettyString).mkString(",")
+            val missingAttributes = o.missingInput.mkString(",")
+            val input = o.inputSet.mkString(",")
 
-            failAnalysis(s"resolved attributes $missingAttributes missing from 
$input")
+            failAnalysis(
+              s"resolved attribute(s) $missingAttributes missing from $input " 
+
+                s"in operator ${operator.simpleString}")
 
-          // Catch all
           case o if !o.resolved =>
             failAnalysis(
               s"unresolved operator ${operator.simpleString}")

http://git-wip-us.apache.org/repos/asf/spark/blob/6f10142d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 400a6b2..48191f3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -47,9 +47,12 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] 
extends TreeNode[PlanTy
    * Attributes that are referenced by expressions but not provided by this 
nodes children.
    * Subclasses should override this method if they produce attributes 
internally as it is used by
    * assertions designed to prevent the construction of invalid plans.
+   *
+   * Note that virtual columns should be excluded. Currently, we only support 
the grouping ID
+   * virtual column.
    */
-  def missingInput: AttributeSet = (references -- inputSet)
-    .filter(_.name != VirtualColumn.groupingIdName)
+  def missingInput: AttributeSet =
+    (references -- inputSet).filter(_.name != VirtualColumn.groupingIdName)
 
   /**
    * Runs [[transform]] with `rule` on all expressions present in this query 
operator.

http://git-wip-us.apache.org/repos/asf/spark/blob/6f10142d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index c1dd5aa..359aec4 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -199,4 +199,22 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
     assert(pl(3).dataType == DecimalType.Unlimited)
     assert(pl(4).dataType == DoubleType)
   }
+
+  test("SPARK-6452 regression test") {
+    // CheckAnalysis should throw AnalysisException when Aggregate contains 
missing attribute(s)
+    val plan =
+      Aggregate(
+        Nil,
+        Alias(Sum(AttributeReference("a", StringType)(exprId = ExprId(1))), 
"b")() :: Nil,
+        LocalRelation(
+          AttributeReference("a", StringType)(exprId = ExprId(2))))
+
+    assert(plan.resolved)
+
+    val message = intercept[AnalysisException] {
+      caseSensitiveAnalyze(plan)
+    }.getMessage
+
+    assert(message.contains("resolved attribute(s) a#1 missing from a#2"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to