Repository: spark
Updated Branches:
  refs/heads/master 5c2ae79bf -> 762366fd8


[SPARK-16552][SQL] Store the Inferred Schemas into External Catalog Tables when 
Creating Tables

#### What changes were proposed in this pull request?

Currently, in Spark SQL, the initial creation of schema can be classified into 
two groups. It is applicable to both Hive tables and Data Source tables:

**Group A. Users specify the schema.**

_Case 1 CREATE TABLE AS SELECT_: the schema is determined by the result schema 
of the SELECT clause. For example,
```SQL
CREATE TABLE tab STORED AS TEXTFILE
AS SELECT * from input
```

_Case 2 CREATE TABLE_: users explicitly specify the schema. For example,
```SQL
CREATE TABLE jsonTable (_1 string, _2 string)
USING org.apache.spark.sql.json
```

**Group B. Spark SQL infers the schema at runtime.**

_Case 3 CREATE TABLE_. Users do not specify the schema but the path to the file 
location. For example,
```SQL
CREATE TABLE jsonTable
USING org.apache.spark.sql.json
OPTIONS (path '${tempDir.getCanonicalPath}')
```

Before this PR, Spark SQL does not store the inferred schema in the external 
catalog for the cases in Group B. When users refreshing the metadata cache, 
accessing the table at the first time after (re-)starting Spark, Spark SQL will 
infer the schema and store the info in the metadata cache for improving the 
performance of subsequent metadata requests. However, the runtime schema 
inference could cause undesirable schema changes after each reboot of Spark.

This PR is to store the inferred schema in the external catalog when creating 
the table. When users intend to refresh the schema after possible changes on 
external files (table location), they issue `REFRESH TABLE`. Spark SQL will 
infer the schema again based on the previously specified table location and 
update/refresh the schema in the external catalog and metadata cache.

In this PR, we do not use the inferred schema to replace the user specified 
schema for avoiding external behavior changes . Based on the design, 
user-specified schemas (as described in Group A) can be changed by ALTER TABLE 
commands, although we do not support them now.

#### How was this patch tested?
TODO: add more cases to cover the changes.

Author: gatorsmile <gatorsm...@gmail.com>

Closes #14207 from gatorsmile/userSpecifiedSchema.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/762366fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/762366fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/762366fd

Branch: refs/heads/master
Commit: 762366fd8722f2b3fa98b8da9338b757a1821708
Parents: 5c2ae79
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Thu Jul 28 17:29:26 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Jul 28 17:29:26 2016 +0800

----------------------------------------------------------------------
 .../command/createDataSourceTables.scala        |  84 ++++----
 .../spark/sql/execution/command/ddl.scala       |  22 +-
 .../spark/sql/execution/command/tables.scala    |  36 ++--
 .../datasources/DataSourceStrategy.scala        |   4 +-
 .../apache/spark/sql/internal/CatalogImpl.scala |   4 +-
 .../spark/sql/execution/command/DDLSuite.scala  | 210 ++++++++++++++++++-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  10 +-
 7 files changed, 291 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/762366fd/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 5e3cd9f..fa3967c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.HiveSerDe
-import org.apache.spark.sql.sources.InsertableRelation
+import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
 import org.apache.spark.sql.types._
 
 /**
@@ -52,7 +52,7 @@ case class CreateDataSourceTableCommand(
     userSpecifiedSchema: Option[StructType],
     provider: String,
     options: Map[String, String],
-    partitionColumns: Array[String],
+    userSpecifiedPartitionColumns: Array[String],
     bucketSpec: Option[BucketSpec],
     ignoreIfExists: Boolean,
     managedIfNoPath: Boolean)
@@ -95,17 +95,39 @@ case class CreateDataSourceTableCommand(
       }
 
     // Create the relation to validate the arguments before writing the 
metadata to the metastore.
-    DataSource(
-      sparkSession = sparkSession,
-      userSpecifiedSchema = userSpecifiedSchema,
-      className = provider,
-      bucketSpec = None,
-      options = optionsWithPath).resolveRelation(checkPathExist = false)
+    val dataSource: BaseRelation =
+      DataSource(
+        sparkSession = sparkSession,
+        userSpecifiedSchema = userSpecifiedSchema,
+        className = provider,
+        bucketSpec = None,
+        options = optionsWithPath).resolveRelation(checkPathExist = false)
+
+    val partitionColumns = if (userSpecifiedSchema.nonEmpty) {
+      userSpecifiedPartitionColumns
+    } else {
+      val res = dataSource match {
+        case r: HadoopFsRelation => r.partitionSchema.fieldNames
+        case _ => Array.empty[String]
+      }
+      if (userSpecifiedPartitionColumns.length > 0) {
+        // The table does not have a specified schema, which means that the 
schema will be inferred
+        // when we load the table. So, we are not expecting partition columns 
and we will discover
+        // partitions when we load the table. However, if there are specified 
partition columns,
+        // we simply ignore them and provide a warning message.
+        logWarning(
+          s"Specified partition columns 
(${userSpecifiedPartitionColumns.mkString(",")}) will be " +
+            s"ignored. The schema and partition columns of table $tableIdent 
are inferred. " +
+            s"Schema: ${dataSource.schema.simpleString}; " +
+            s"Partition columns: ${res.mkString("(", ", ", ")")}")
+      }
+      res
+    }
 
     CreateDataSourceTableUtils.createDataSourceTable(
       sparkSession = sparkSession,
       tableIdent = tableIdent,
-      userSpecifiedSchema = userSpecifiedSchema,
+      schema = dataSource.schema,
       partitionColumns = partitionColumns,
       bucketSpec = bucketSpec,
       provider = provider,
@@ -213,7 +235,7 @@ case class CreateDataSourceTableAsSelectCommand(
               }
               existingSchema = Some(l.schema)
             case s: SimpleCatalogRelation if 
DDLUtils.isDatasourceTable(s.metadata) =>
-              existingSchema = 
DDLUtils.getSchemaFromTableProperties(s.metadata)
+              existingSchema = 
Some(DDLUtils.getSchemaFromTableProperties(s.metadata))
             case o =>
               throw new AnalysisException(s"Saving data in ${o.toString} is 
not supported.")
           }
@@ -256,7 +278,7 @@ case class CreateDataSourceTableAsSelectCommand(
       CreateDataSourceTableUtils.createDataSourceTable(
         sparkSession = sparkSession,
         tableIdent = tableIdent,
-        userSpecifiedSchema = Some(result.schema),
+        schema = result.schema,
         partitionColumns = partitionColumns,
         bucketSpec = bucketSpec,
         provider = provider,
@@ -306,7 +328,7 @@ object CreateDataSourceTableUtils extends Logging {
   def createDataSourceTable(
       sparkSession: SparkSession,
       tableIdent: TableIdentifier,
-      userSpecifiedSchema: Option[StructType],
+      schema: StructType,
       partitionColumns: Array[String],
       bucketSpec: Option[BucketSpec],
       provider: String,
@@ -315,28 +337,26 @@ object CreateDataSourceTableUtils extends Logging {
     val tableProperties = new mutable.HashMap[String, String]
     tableProperties.put(DATASOURCE_PROVIDER, provider)
 
-    // Saves optional user specified schema.  Serialized JSON schema string 
may be too long to be
-    // stored into a single metastore SerDe property.  In this case, we split 
the JSON string and
-    // store each part as a separate SerDe property.
-    userSpecifiedSchema.foreach { schema =>
-      val threshold = 
sparkSession.sessionState.conf.schemaStringLengthThreshold
-      val schemaJsonString = schema.json
-      // Split the JSON string.
-      val parts = schemaJsonString.grouped(threshold).toSeq
-      tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
-      parts.zipWithIndex.foreach { case (part, index) =>
-        tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
-      }
+    // Serialized JSON schema string may be too long to be stored into a 
single metastore table
+    // property. In this case, we split the JSON string and store each part as 
a separate table
+    // property.
+    val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
+    val schemaJsonString = schema.json
+    // Split the JSON string.
+    val parts = schemaJsonString.grouped(threshold).toSeq
+    tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
+    parts.zipWithIndex.foreach { case (part, index) =>
+      tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
     }
 
-    if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) {
+    if (partitionColumns.length > 0) {
       tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, 
partitionColumns.length.toString)
       partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
         tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", 
partCol)
       }
     }
 
-    if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
+    if (bucketSpec.isDefined) {
       val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = 
bucketSpec.get
 
       tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
@@ -353,16 +373,6 @@ object CreateDataSourceTableUtils extends Logging {
       }
     }
 
-    if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) {
-      // The table does not have a specified schema, which means that the 
schema will be inferred
-      // when we load the table. So, we are not expecting partition columns 
and we will discover
-      // partitions when we load the table. However, if there are specified 
partition columns,
-      // we simply ignore them and provide a warning message.
-      logWarning(
-        s"The schema and partitions of table $tableIdent will be inferred when 
it is loaded. " +
-          s"Specified partition columns (${partitionColumns.mkString(",")}) 
will be ignored.")
-    }
-
     val tableType = if (isExternal) {
       tableProperties.put("EXTERNAL", "TRUE")
       CatalogTableType.EXTERNAL
@@ -375,7 +385,7 @@ object CreateDataSourceTableUtils extends Logging {
     val dataSource =
       DataSource(
         sparkSession,
-        userSpecifiedSchema = userSpecifiedSchema,
+        userSpecifiedSchema = Some(schema),
         partitionColumns = partitionColumns,
         bucketSpec = bucketSpec,
         className = provider,

http://git-wip-us.apache.org/repos/asf/spark/blob/762366fd/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 03f81c4..7e99593 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -521,31 +521,29 @@ object DDLUtils {
     table.partitionColumns.nonEmpty || 
table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)
   }
 
-  // A persisted data source table may not store its schema in the catalog. In 
this case, its schema
-  // will be inferred at runtime when the table is referenced.
-  def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] 
= {
+  // A persisted data source table always store its schema in the catalog.
+  def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
     require(isDatasourceTable(metadata))
+    val msgSchemaCorrupted = "Could not read schema from the metastore because 
it is corrupted."
     val props = metadata.properties
-    if (props.isDefinedAt(DATASOURCE_SCHEMA)) {
+    props.get(DATASOURCE_SCHEMA).map { schema =>
       // Originally, we used spark.sql.sources.schema to store the schema of a 
data source table.
       // After SPARK-6024, we removed this flag.
       // Although we are not using spark.sql.sources.schema any more, we need 
to still support.
-      
props.get(DATASOURCE_SCHEMA).map(DataType.fromJson(_).asInstanceOf[StructType])
-    } else {
-      metadata.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
+      DataType.fromJson(schema).asInstanceOf[StructType]
+    } getOrElse {
+      props.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
         val parts = (0 until numParts.toInt).map { index =>
           val part = 
metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
           if (part == null) {
-            throw new AnalysisException(
-              "Could not read schema from the metastore because it is 
corrupted " +
-                s"(missing part $index of the schema, $numParts parts are 
expected).")
+            throw new AnalysisException(msgSchemaCorrupted +
+              s" (missing part $index of the schema, $numParts parts are 
expected).")
           }
-
           part
         }
         // Stick all parts back to a single schema string.
         DataType.fromJson(parts.mkString).asInstanceOf[StructType]
-      }
+      } getOrElse(throw new AnalysisException(msgSchemaCorrupted))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/762366fd/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 8263380..f85373c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -416,15 +416,7 @@ case class DescribeTableCommand(table: TableIdentifier, 
isExtended: Boolean, isF
     } else {
       val metadata = catalog.getTableMetadata(table)
 
-      if (DDLUtils.isDatasourceTable(metadata)) {
-        DDLUtils.getSchemaFromTableProperties(metadata) match {
-          case Some(userSpecifiedSchema) => 
describeSchema(userSpecifiedSchema, result)
-          case None => describeSchema(catalog.lookupRelation(table).schema, 
result)
-        }
-      } else {
-        describeSchema(metadata.schema, result)
-      }
-
+      describeSchema(metadata, result)
       if (isExtended) {
         describeExtended(metadata, result)
       } else if (isFormatted) {
@@ -439,12 +431,12 @@ case class DescribeTableCommand(table: TableIdentifier, 
isExtended: Boolean, isF
 
   private def describePartitionInfo(table: CatalogTable, buffer: 
ArrayBuffer[Row]): Unit = {
     if (DDLUtils.isDatasourceTable(table)) {
-      val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table)
       val partColNames = DDLUtils.getPartitionColumnsFromTableProperties(table)
-      for (schema <- userSpecifiedSchema if partColNames.nonEmpty) {
+      if (partColNames.nonEmpty) {
+        val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table)
         append(buffer, "# Partition Information", "", "")
         append(buffer, s"# ${output.head.name}", output(1).name, 
output(2).name)
-        describeSchema(StructType(partColNames.map(schema(_))), buffer)
+        describeSchema(StructType(partColNames.map(userSpecifiedSchema(_))), 
buffer)
       }
     } else {
       if (table.partitionColumns.nonEmpty) {
@@ -518,6 +510,17 @@ case class DescribeTableCommand(table: TableIdentifier, 
isExtended: Boolean, isF
     }
   }
 
+  private def describeSchema(
+      tableDesc: CatalogTable,
+      buffer: ArrayBuffer[Row]): Unit = {
+    if (DDLUtils.isDatasourceTable(tableDesc)) {
+      val schema = DDLUtils.getSchemaFromTableProperties(tableDesc)
+      describeSchema(schema, buffer)
+    } else {
+      describeSchema(tableDesc.schema, buffer)
+    }
+  }
+
   private def describeSchema(schema: Seq[CatalogColumn], buffer: 
ArrayBuffer[Row]): Unit = {
     schema.foreach { column =>
       append(buffer, column.name, column.dataType.toLowerCase, 
column.comment.orNull)
@@ -876,12 +879,9 @@ case class ShowCreateTableCommand(table: TableIdentifier) 
extends RunnableComman
 
   private def showDataSourceTableDataColumns(
       metadata: CatalogTable, builder: StringBuilder): Unit = {
-    DDLUtils.getSchemaFromTableProperties(metadata).foreach { schema =>
-      val columns = schema.fields.map(f => s"${quoteIdentifier(f.name)} 
${f.dataType.sql}")
-      builder ++= columns.mkString("(", ", ", ")")
-    }
-
-    builder ++= "\n"
+    val schema = DDLUtils.getSchemaFromTableProperties(metadata)
+    val columns = schema.fields.map(f => s"${quoteIdentifier(f.name)} 
${f.dataType.sql}")
+    builder ++= columns.mkString("(", ", ", ")\n")
   }
 
   private def showDataSourceTableOptions(metadata: CatalogTable, builder: 
StringBuilder): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/762366fd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 8ffdc50..ca03b26 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -205,7 +205,7 @@ private[sql] case class DataSourceAnalysis(conf: 
CatalystConf) extends Rule[Logi
  */
 private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
   private def readDataSourceTable(sparkSession: SparkSession, table: 
CatalogTable): LogicalPlan = {
-    val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table)
+    val schema = DDLUtils.getSchemaFromTableProperties(table)
 
     // We only need names at here since userSpecifiedSchema we loaded from the 
metastore
     // contains partition columns. We can always get datatypes of partitioning 
columns
@@ -218,7 +218,7 @@ private[sql] class FindDataSourceTable(sparkSession: 
SparkSession) extends Rule[
     val dataSource =
       DataSource(
         sparkSession,
-        userSpecifiedSchema = userSpecifiedSchema,
+        userSpecifiedSchema = Some(schema),
         partitionColumns = partitionColumns,
         bucketSpec = bucketSpec,
         className = 
table.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER),

http://git-wip-us.apache.org/repos/asf/spark/blob/762366fd/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 05dfb8c..5393b76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -352,13 +352,15 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
 
   /**
    * Refresh the cache entry for a table, if any. For Hive metastore table, 
the metadata
-   * is refreshed.
+   * is refreshed. For data source tables, the schema will not be inferred and 
refreshed.
    *
    * @group cachemgmt
    * @since 2.0.0
    */
   override def refreshTable(tableName: String): Unit = {
     val tableIdent = 
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+    // Temp tables: refresh (or invalidate) any metadata/data cached in the 
plan recursively.
+    // Non-temp tables: refresh the metadata cache.
     sessionCatalog.refreshTable(tableIdent)
 
     // If this table is cached as an InMemoryRelation, drop the original

http://git-wip-us.apache.org/repos/asf/spark/blob/762366fd/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index a354594..7bd1b0b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.internal.config._
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, 
FunctionRegistry, NoSuchPartitionException, NoSuchTableException, 
TempTableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, 
CatalogStorageFormat}
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
 class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach 
{
@@ -252,6 +252,208 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
     }
   }
 
+  private def checkSchemaInCreatedDataSourceTable(
+      path: File,
+      userSpecifiedSchema: Option[String],
+      userSpecifiedPartitionCols: Option[String],
+      expectedSchema: StructType,
+      expectedPartitionCols: Seq[String]): Unit = {
+    var tableSchema = StructType(Nil)
+    var partCols = Seq.empty[String]
+
+    val tabName = "tab1"
+    withTable(tabName) {
+      val partitionClause =
+        userSpecifiedPartitionCols.map(p => s"PARTITIONED BY 
($p)").getOrElse("")
+      val schemaClause = userSpecifiedSchema.map(s => s"($s)").getOrElse("")
+      sql(
+        s"""
+           |CREATE TABLE $tabName $schemaClause
+           |USING parquet
+           |OPTIONS (
+           |  path '$path'
+           |)
+           |$partitionClause
+         """.stripMargin)
+      val tableMetadata = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))
+
+      tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata)
+      partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)
+    }
+    assert(tableSchema == expectedSchema)
+    assert(partCols == expectedPartitionCols)
+  }
+
+  test("Create partitioned data source table without user specified schema") {
+    import testImplicits._
+    val df = sparkContext.parallelize(1 to 10).map(i => (i, 
i.toString)).toDF("num", "str")
+
+    // Case 1: with partitioning columns but no schema: 
Option("inexistentColumns")
+    // Case 2: without schema and partitioning columns: None
+    Seq(Option("inexistentColumns"), None).foreach { partitionCols =>
+      withTempPath { pathToPartitionedTable =>
+        df.write.format("parquet").partitionBy("num")
+          .save(pathToPartitionedTable.getCanonicalPath)
+        checkSchemaInCreatedDataSourceTable(
+          pathToPartitionedTable,
+          userSpecifiedSchema = None,
+          userSpecifiedPartitionCols = partitionCols,
+          expectedSchema = new StructType().add("str", StringType).add("num", 
IntegerType),
+          expectedPartitionCols = Seq("num"))
+      }
+    }
+  }
+
+  test("Create partitioned data source table with user specified schema") {
+    import testImplicits._
+    val df = sparkContext.parallelize(1 to 10).map(i => (i, 
i.toString)).toDF("num", "str")
+
+    // Case 1: with partitioning columns but no schema: Option("num")
+    // Case 2: without schema and partitioning columns: None
+    Seq(Option("num"), None).foreach { partitionCols =>
+      withTempPath { pathToPartitionedTable =>
+        df.write.format("parquet").partitionBy("num")
+          .save(pathToPartitionedTable.getCanonicalPath)
+        checkSchemaInCreatedDataSourceTable(
+          pathToPartitionedTable,
+          userSpecifiedSchema = Option("num int, str string"),
+          userSpecifiedPartitionCols = partitionCols,
+          expectedSchema = new StructType().add("num", IntegerType).add("str", 
StringType),
+          expectedPartitionCols = 
partitionCols.map(Seq(_)).getOrElse(Seq.empty[String]))
+      }
+    }
+  }
+
+  test("Create non-partitioned data source table without user specified 
schema") {
+    import testImplicits._
+    val df = sparkContext.parallelize(1 to 10).map(i => (i, 
i.toString)).toDF("num", "str")
+
+    // Case 1: with partitioning columns but no schema: 
Option("inexistentColumns")
+    // Case 2: without schema and partitioning columns: None
+    Seq(Option("inexistentColumns"), None).foreach { partitionCols =>
+      withTempPath { pathToNonPartitionedTable =>
+        
df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath)
+        checkSchemaInCreatedDataSourceTable(
+          pathToNonPartitionedTable,
+          userSpecifiedSchema = None,
+          userSpecifiedPartitionCols = partitionCols,
+          expectedSchema = new StructType().add("num", IntegerType).add("str", 
StringType),
+          expectedPartitionCols = Seq.empty[String])
+      }
+    }
+  }
+
+  test("Create non-partitioned data source table with user specified schema") {
+    import testImplicits._
+    val df = sparkContext.parallelize(1 to 10).map(i => (i, 
i.toString)).toDF("num", "str")
+
+    // Case 1: with partitioning columns but no schema: 
Option("inexistentColumns")
+    // Case 2: without schema and partitioning columns: None
+    Seq(Option("num"), None).foreach { partitionCols =>
+      withTempPath { pathToNonPartitionedTable =>
+        
df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath)
+        checkSchemaInCreatedDataSourceTable(
+          pathToNonPartitionedTable,
+          userSpecifiedSchema = Option("num int, str string"),
+          userSpecifiedPartitionCols = partitionCols,
+          expectedSchema = new StructType().add("num", IntegerType).add("str", 
StringType),
+          expectedPartitionCols = 
partitionCols.map(Seq(_)).getOrElse(Seq.empty[String]))
+      }
+    }
+  }
+
+  test("Describe Table with Corrupted Schema") {
+    import testImplicits._
+
+    val tabName = "tab1"
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val df = sparkContext.parallelize(1 to 10).map(i => (i, 
i.toString)).toDF("col1", "col2")
+      df.write.format("json").save(path)
+
+      withTable(tabName) {
+        sql(
+          s"""
+             |CREATE TABLE $tabName
+             |USING json
+             |OPTIONS (
+             |  path '$path'
+             |)
+           """.stripMargin)
+
+        val catalog = spark.sessionState.catalog
+        val table = catalog.getTableMetadata(TableIdentifier(tabName))
+        val newProperties = table.properties.filterKeys(key =>
+          key != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS)
+        val newTable = table.copy(properties = newProperties)
+        catalog.alterTable(newTable)
+
+        val e = intercept[AnalysisException] {
+          sql(s"DESC $tabName")
+        }.getMessage
+        assert(e.contains(s"Could not read schema from the metastore because 
it is corrupted"))
+      }
+    }
+  }
+
+  test("Refresh table after changing the data source table partitioning") {
+    import testImplicits._
+
+    val tabName = "tab1"
+    val catalog = spark.sessionState.catalog
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString, i, 
i))
+        .toDF("col1", "col2", "col3", "col4")
+      df.write.format("json").partitionBy("col1", "col3").save(path)
+      val schema = new StructType()
+        .add("col2", StringType).add("col4", LongType)
+        .add("col1", IntegerType).add("col3", IntegerType)
+      val partitionCols = Seq("col1", "col3")
+
+      withTable(tabName) {
+        spark.sql(
+          s"""
+             |CREATE TABLE $tabName
+             |USING json
+             |OPTIONS (
+             |  path '$path'
+             |)
+           """.stripMargin)
+        val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName))
+        val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata)
+        assert(tableSchema == schema)
+        val partCols = 
DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)
+        assert(partCols == partitionCols)
+
+        // Change the schema
+        val newDF = sparkContext.parallelize(1 to 10).map(i => (i, i.toString))
+          .toDF("newCol1", "newCol2")
+        
newDF.write.format("json").partitionBy("newCol1").mode(SaveMode.Overwrite).save(path)
+
+        // No change on the schema
+        val tableMetadataBeforeRefresh = 
catalog.getTableMetadata(TableIdentifier(tabName))
+        val tableSchemaBeforeRefresh =
+          DDLUtils.getSchemaFromTableProperties(tableMetadataBeforeRefresh)
+        assert(tableSchemaBeforeRefresh == schema)
+        val partColsBeforeRefresh =
+          
DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataBeforeRefresh)
+        assert(partColsBeforeRefresh == partitionCols)
+
+        // Refresh does not affect the schema
+        spark.catalog.refreshTable(tabName)
+
+        val tableMetadataAfterRefresh = 
catalog.getTableMetadata(TableIdentifier(tabName))
+        val tableSchemaAfterRefresh =
+          DDLUtils.getSchemaFromTableProperties(tableMetadataAfterRefresh)
+        assert(tableSchemaAfterRefresh == schema)
+        val partColsAfterRefresh =
+          
DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataAfterRefresh)
+        assert(partColsAfterRefresh == partitionCols)
+      }
+    }
+  }
+
   test("desc table for parquet data source table using in-memory catalog") {
     assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
     val tabName = "tab1"
@@ -413,7 +615,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterEach {
       assert(table.schema.isEmpty) // partitioned datasource table is not 
hive-compatible
       assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
       assert(DDLUtils.getSchemaFromTableProperties(table) ==
-        Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
+        new StructType().add("a", IntegerType).add("b", IntegerType))
       assert(DDLUtils.getPartitionColumnsFromTableProperties(table) ==
         Seq("a"))
     }
@@ -429,7 +631,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterEach {
       assert(table.schema.isEmpty) // partitioned datasource table is not 
hive-compatible
       assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
       assert(DDLUtils.getSchemaFromTableProperties(table) ==
-        Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
+        new StructType().add("a", IntegerType).add("b", IntegerType))
       assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
         Some(BucketSpec(5, Seq("a"), Seq("b"))))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/762366fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 111fb8b..571cae0 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -191,10 +191,10 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
 
         sql("REFRESH TABLE jsonTable")
 
-        // Check that the refresh worked
+        // After refresh, schema is not changed.
         checkAnswer(
           sql("SELECT * FROM jsonTable"),
-          Row("a1", "b1", "c1"))
+          Row("a1", "b1"))
       }
     }
   }
@@ -703,7 +703,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
           createDataSourceTable(
             sparkSession = spark,
             tableIdent = TableIdentifier("wide_schema"),
-            userSpecifiedSchema = Some(schema),
+            schema = schema,
             partitionColumns = Array.empty[String],
             bucketSpec = None,
             provider = "json",
@@ -988,7 +988,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
       createDataSourceTable(
         sparkSession = spark,
         tableIdent = TableIdentifier("not_skip_hive_metadata"),
-        userSpecifiedSchema = Some(schema),
+        schema = schema,
         partitionColumns = Array.empty[String],
         bucketSpec = None,
         provider = "parquet",
@@ -1003,7 +1003,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
       createDataSourceTable(
         sparkSession = spark,
         tableIdent = TableIdentifier("skip_hive_metadata"),
-        userSpecifiedSchema = Some(schema),
+        schema = schema,
         partitionColumns = Array.empty[String],
         bucketSpec = None,
         provider = "parquet",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to