Repository: spark
Updated Branches:
  refs/heads/master 6c7bb575b -> bdd27961c


[SPARK-24251][SQL] Add analysis tests for AppendData.

## What changes were proposed in this pull request?

This is a follow-up to #21305 that adds a test suite for AppendData analysis.

This also fixes the following problems uncovered by these tests:
* Incorrect order of data types passed to `canWrite` is fixed
* The field check calls `canWrite` first to ensure all errors are found
* `AppendData#resolved` must check resolution of the query's attributes
* Column names are quoted to show empty names

## How was this patch tested?

This PR adds a test suite for AppendData analysis.

Closes #22043 from rdblue/SPARK-24251-add-append-data-analysis-tests.

Authored-by: Ryan Blue <b...@apache.org>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bdd27961
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bdd27961
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bdd27961

Branch: refs/heads/master
Commit: bdd27961c870a3c443686cdbb6dd0eee3ad32012
Parents: 6c7bb57
Author: Ryan Blue <b...@apache.org>
Authored: Fri Aug 10 11:10:23 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Aug 10 11:10:23 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  16 +-
 .../plans/logical/basicLogicalOperators.scala   |  15 +-
 .../analysis/DataSourceV2AnalysisSuite.scala    | 379 +++++++++++++++++++
 3 files changed, 397 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bdd27961/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index a7cd96e..d00b82d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2258,8 +2258,8 @@ class Analyzer(
       if (expected.size < query.output.size) {
         throw new AnalysisException(
           s"""Cannot write to '$tableName', too many data columns:
-             |Table columns: ${expected.map(_.name).mkString(", ")}
-             |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+             |Table columns: ${expected.map(c => s"'${c.name}'").mkString(", 
")}
+             |Data columns: ${query.output.map(c => 
s"'${c.name}'").mkString(", ")}""".stripMargin)
       }
 
       val errors = new mutable.ArrayBuffer[String]()
@@ -2278,8 +2278,9 @@ class Analyzer(
         if (expected.size > query.output.size) {
           throw new AnalysisException(
             s"""Cannot write to '$tableName', not enough data columns:
-               |Table columns: ${expected.map(_.name).mkString(", ")}
-               |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+               |Table columns: ${expected.map(c => s"'${c.name}'").mkString(", 
")}
+               |Data columns: ${query.output.map(c => 
s"'${c.name}'").mkString(", ")}"""
+                .stripMargin)
         }
 
         query.output.zip(expected).flatMap {
@@ -2301,12 +2302,15 @@ class Analyzer(
         queryExpr: NamedExpression,
         addError: String => Unit): Option[NamedExpression] = {
 
+      // run the type check first to ensure type errors are present
+      val canWrite = DataType.canWrite(
+        queryExpr.dataType, tableAttr.dataType, resolver, tableAttr.name, 
addError)
+
       if (queryExpr.nullable && !tableAttr.nullable) {
         addError(s"Cannot write nullable values to non-null column 
'${tableAttr.name}'")
         None
 
-      } else if (!DataType.canWrite(
-          tableAttr.dataType, queryExpr.dataType, resolver, tableAttr.name, 
addError)) {
+      } else if (!canWrite) {
         None
 
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/bdd27961/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 0d31c6f..a6631a8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -363,13 +363,14 @@ case class AppendData(
   override def output: Seq[Attribute] = Seq.empty
 
   override lazy val resolved: Boolean = {
-    query.output.size == table.output.size && 
query.output.zip(table.output).forall {
-      case (inAttr, outAttr) =>
-          // names and types must match, nullability must be compatible
-          inAttr.name == outAttr.name &&
-          DataType.equalsIgnoreCompatibleNullability(outAttr.dataType, 
inAttr.dataType) &&
-          (outAttr.nullable || !inAttr.nullable)
-    }
+    table.resolved && query.resolved && query.output.size == table.output.size 
&&
+        query.output.zip(table.output).forall {
+          case (inAttr, outAttr) =>
+            // names and types must match, nullability must be compatible
+            inAttr.name == outAttr.name &&
+                DataType.equalsIgnoreCompatibleNullability(outAttr.dataType, 
inAttr.dataType) &&
+                (outAttr.nullable || !inAttr.nullable)
+        }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bdd27961/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
new file mode 100644
index 0000000..6c899b6
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import java.util.Locale
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Cast, UpCast}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LeafNode, 
LogicalPlan, Project}
+import org.apache.spark.sql.types.{DoubleType, FloatType, StructField, 
StructType}
+
+case class TestRelation(output: Seq[AttributeReference]) extends LeafNode with 
NamedRelation {
+  override def name: String = "table-name"
+}
+
+class DataSourceV2AnalysisSuite extends AnalysisTest {
+  val table = TestRelation(StructType(Seq(
+    StructField("x", FloatType),
+    StructField("y", FloatType))).toAttributes)
+
+  val requiredTable = TestRelation(StructType(Seq(
+    StructField("x", FloatType, nullable = false),
+    StructField("y", FloatType, nullable = false))).toAttributes)
+
+  val widerTable = TestRelation(StructType(Seq(
+    StructField("x", DoubleType),
+    StructField("y", DoubleType))).toAttributes)
+
+  test("Append.byName: basic behavior") {
+    val query = TestRelation(table.schema.toAttributes)
+
+    val parsedPlan = AppendData.byName(table, query)
+
+    checkAnalysis(parsedPlan, parsedPlan)
+    assertResolved(parsedPlan)
+  }
+
+  test("Append.byName: does not match by position") {
+    val query = TestRelation(StructType(Seq(
+      StructField("a", FloatType),
+      StructField("b", FloatType))).toAttributes)
+
+    val parsedPlan = AppendData.byName(table, query)
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write incompatible data to table", "'table-name'",
+      "Cannot find data for output column", "'x'", "'y'"))
+  }
+
+  test("Append.byName: case sensitive column resolution") {
+    val query = TestRelation(StructType(Seq(
+      StructField("X", FloatType), // doesn't match case!
+      StructField("y", FloatType))).toAttributes)
+
+    val parsedPlan = AppendData.byName(table, query)
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write incompatible data to table", "'table-name'",
+      "Cannot find data for output column", "'x'"),
+      caseSensitive = true)
+  }
+
+  test("Append.byName: case insensitive column resolution") {
+    val query = TestRelation(StructType(Seq(
+      StructField("X", FloatType), // doesn't match case!
+      StructField("y", FloatType))).toAttributes)
+
+    val X = query.output.head
+    val y = query.output.last
+
+    val parsedPlan = AppendData.byName(table, query)
+    val expectedPlan = AppendData.byName(table,
+      Project(Seq(
+        Alias(Cast(toLower(X), FloatType, Some(conf.sessionLocalTimeZone)), 
"x")(),
+        Alias(Cast(y, FloatType, Some(conf.sessionLocalTimeZone)), "y")()),
+        query))
+
+    assertNotResolved(parsedPlan)
+    checkAnalysis(parsedPlan, expectedPlan, caseSensitive = false)
+    assertResolved(expectedPlan)
+  }
+
+  test("Append.byName: data columns are reordered by name") {
+    // out of order
+    val query = TestRelation(StructType(Seq(
+      StructField("y", FloatType),
+      StructField("x", FloatType))).toAttributes)
+
+    val y = query.output.head
+    val x = query.output.last
+
+    val parsedPlan = AppendData.byName(table, query)
+    val expectedPlan = AppendData.byName(table,
+      Project(Seq(
+        Alias(Cast(x, FloatType, Some(conf.sessionLocalTimeZone)), "x")(),
+        Alias(Cast(y, FloatType, Some(conf.sessionLocalTimeZone)), "y")()),
+        query))
+
+    assertNotResolved(parsedPlan)
+    checkAnalysis(parsedPlan, expectedPlan)
+    assertResolved(expectedPlan)
+  }
+
+  test("Append.byName: fail nullable data written to required columns") {
+    val parsedPlan = AppendData.byName(requiredTable, table)
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write incompatible data to table", "'table-name'",
+      "Cannot write nullable values to non-null column", "'x'", "'y'"))
+  }
+
+  test("Append.byName: allow required data written to nullable columns") {
+    val parsedPlan = AppendData.byName(table, requiredTable)
+    assertResolved(parsedPlan)
+    checkAnalysis(parsedPlan, parsedPlan)
+  }
+
+  test("Append.byName: missing required columns cause failure and are 
identified by name") {
+    // missing required field x
+    val query = TestRelation(StructType(Seq(
+      StructField("y", FloatType, nullable = false))).toAttributes)
+
+    val parsedPlan = AppendData.byName(requiredTable, query)
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write incompatible data to table", "'table-name'",
+      "Cannot find data for output column", "'x'"))
+  }
+
+  test("Append.byName: missing optional columns cause failure and are 
identified by name") {
+    // missing optional field x
+    val query = TestRelation(StructType(Seq(
+      StructField("y", FloatType))).toAttributes)
+
+    val parsedPlan = AppendData.byName(table, query)
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write incompatible data to table", "'table-name'",
+      "Cannot find data for output column", "'x'"))
+  }
+
+  test("Append.byName: fail canWrite check") {
+    val parsedPlan = AppendData.byName(table, widerTable)
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write", "'table-name'",
+      "Cannot safely cast", "'x'", "'y'", "DoubleType to FloatType"))
+  }
+
+  test("Append.byName: insert safe cast") {
+    val x = table.output.head
+    val y = table.output.last
+
+    val parsedPlan = AppendData.byName(widerTable, table)
+    val expectedPlan = AppendData.byName(widerTable,
+      Project(Seq(
+        Alias(Cast(x, DoubleType, Some(conf.sessionLocalTimeZone)), "x")(),
+        Alias(Cast(y, DoubleType, Some(conf.sessionLocalTimeZone)), "y")()),
+        table))
+
+    assertNotResolved(parsedPlan)
+    checkAnalysis(parsedPlan, expectedPlan)
+    assertResolved(expectedPlan)
+  }
+
+  test("Append.byName: fail extra data fields") {
+    val query = TestRelation(StructType(Seq(
+      StructField("x", FloatType),
+      StructField("y", FloatType),
+      StructField("z", FloatType))).toAttributes)
+
+    val parsedPlan = AppendData.byName(table, query)
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write", "'table-name'", "too many data columns",
+      "Table columns: 'x', 'y'",
+      "Data columns: 'x', 'y', 'z'"))
+  }
+
+  test("Append.byName: multiple field errors are reported") {
+    val xRequiredTable = TestRelation(StructType(Seq(
+      StructField("x", FloatType, nullable = false),
+      StructField("y", DoubleType))).toAttributes)
+
+    val query = TestRelation(StructType(Seq(
+      StructField("x", DoubleType),
+      StructField("b", FloatType))).toAttributes)
+
+    val parsedPlan = AppendData.byName(xRequiredTable, query)
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write incompatible data to table", "'table-name'",
+      "Cannot safely cast", "'x'", "DoubleType to FloatType",
+      "Cannot write nullable values to non-null column", "'x'",
+      "Cannot find data for output column", "'y'"))
+  }
+
+  test("Append.byPosition: basic behavior") {
+    val query = TestRelation(StructType(Seq(
+      StructField("a", FloatType),
+      StructField("b", FloatType))).toAttributes)
+
+    val a = query.output.head
+    val b = query.output.last
+
+    val parsedPlan = AppendData.byPosition(table, query)
+    val expectedPlan = AppendData.byPosition(table,
+      Project(Seq(
+        Alias(Cast(a, FloatType, Some(conf.sessionLocalTimeZone)), "x")(),
+        Alias(Cast(b, FloatType, Some(conf.sessionLocalTimeZone)), "y")()),
+        query))
+
+    assertNotResolved(parsedPlan)
+    checkAnalysis(parsedPlan, expectedPlan, caseSensitive = false)
+    assertResolved(expectedPlan)
+  }
+
+  test("Append.byPosition: data columns are not reordered") {
+    // out of order
+    val query = TestRelation(StructType(Seq(
+      StructField("y", FloatType),
+      StructField("x", FloatType))).toAttributes)
+
+    val y = query.output.head
+    val x = query.output.last
+
+    val parsedPlan = AppendData.byPosition(table, query)
+    val expectedPlan = AppendData.byPosition(table,
+      Project(Seq(
+        Alias(Cast(y, FloatType, Some(conf.sessionLocalTimeZone)), "x")(),
+        Alias(Cast(x, FloatType, Some(conf.sessionLocalTimeZone)), "y")()),
+        query))
+
+    assertNotResolved(parsedPlan)
+    checkAnalysis(parsedPlan, expectedPlan)
+    assertResolved(expectedPlan)
+  }
+
+  test("Append.byPosition: fail nullable data written to required columns") {
+    val parsedPlan = AppendData.byPosition(requiredTable, table)
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write incompatible data to table", "'table-name'",
+      "Cannot write nullable values to non-null column", "'x'", "'y'"))
+  }
+
+  test("Append.byPosition: allow required data written to nullable columns") {
+    val parsedPlan = AppendData.byPosition(table, requiredTable)
+    assertResolved(parsedPlan)
+    checkAnalysis(parsedPlan, parsedPlan)
+  }
+
+  test("Append.byPosition: missing required columns cause failure") {
+    // missing optional field x
+    val query = TestRelation(StructType(Seq(
+      StructField("y", FloatType, nullable = false))).toAttributes)
+
+    val parsedPlan = AppendData.byPosition(requiredTable, query)
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write", "'table-name'", "not enough data columns",
+      "Table columns: 'x', 'y'",
+      "Data columns: 'y'"))
+  }
+
+  test("Append.byPosition: missing optional columns cause failure") {
+    // missing optional field x
+    val query = TestRelation(StructType(Seq(
+      StructField("y", FloatType))).toAttributes)
+
+    val parsedPlan = AppendData.byPosition(table, query)
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write", "'table-name'", "not enough data columns",
+      "Table columns: 'x', 'y'",
+      "Data columns: 'y'"))
+  }
+
+  test("Append.byPosition: fail canWrite check") {
+    val widerTable = TestRelation(StructType(Seq(
+      StructField("a", DoubleType),
+      StructField("b", DoubleType))).toAttributes)
+
+    val parsedPlan = AppendData.byPosition(table, widerTable)
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write", "'table-name'",
+      "Cannot safely cast", "'x'", "'y'", "DoubleType to FloatType"))
+  }
+
+  test("Append.byPosition: insert safe cast") {
+    val widerTable = TestRelation(StructType(Seq(
+      StructField("a", DoubleType),
+      StructField("b", DoubleType))).toAttributes)
+
+    val x = table.output.head
+    val y = table.output.last
+
+    val parsedPlan = AppendData.byPosition(widerTable, table)
+    val expectedPlan = AppendData.byPosition(widerTable,
+      Project(Seq(
+        Alias(Cast(x, DoubleType, Some(conf.sessionLocalTimeZone)), "a")(),
+        Alias(Cast(y, DoubleType, Some(conf.sessionLocalTimeZone)), "b")()),
+        table))
+
+    assertNotResolved(parsedPlan)
+    checkAnalysis(parsedPlan, expectedPlan)
+    assertResolved(expectedPlan)
+  }
+
+  test("Append.byPosition: fail extra data fields") {
+    val query = TestRelation(StructType(Seq(
+      StructField("a", FloatType),
+      StructField("b", FloatType),
+      StructField("c", FloatType))).toAttributes)
+
+    val parsedPlan = AppendData.byName(table, query)
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write", "'table-name'", "too many data columns",
+      "Table columns: 'x', 'y'",
+      "Data columns: 'a', 'b', 'c'"))
+  }
+
+  test("Append.byPosition: multiple field errors are reported") {
+    val xRequiredTable = TestRelation(StructType(Seq(
+      StructField("x", FloatType, nullable = false),
+      StructField("y", DoubleType))).toAttributes)
+
+    val query = TestRelation(StructType(Seq(
+      StructField("x", DoubleType),
+      StructField("b", FloatType))).toAttributes)
+
+    val parsedPlan = AppendData.byPosition(xRequiredTable, query)
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write incompatible data to table", "'table-name'",
+      "Cannot write nullable values to non-null column", "'x'",
+      "Cannot safely cast", "'x'", "DoubleType to FloatType"))
+  }
+
+  def assertNotResolved(logicalPlan: LogicalPlan): Unit = {
+    assert(!logicalPlan.resolved, s"Plan should not be resolved: $logicalPlan")
+  }
+
+  def assertResolved(logicalPlan: LogicalPlan): Unit = {
+    assert(logicalPlan.resolved, s"Plan should be resolved: $logicalPlan")
+  }
+
+  def toLower(attr: AttributeReference): AttributeReference = {
+    AttributeReference(attr.name.toLowerCase(Locale.ROOT), 
attr.dataType)(attr.exprId)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to