Repository: spark
Updated Branches:
  refs/heads/master 38b9e6962 -> a5f02b002


[SPARK-18647][SQL] do not put provider in table properties for Hive serde table

## What changes were proposed in this pull request?

In Spark 2.1, we make Hive serde tables case-preserving by putting the table 
metadata in table properties, like what we did for data source table. However, 
we should not put table provider, as it will break forward compatibility. e.g. 
if we create a Hive serde table with Spark 2.1, using `sql("create table test 
stored as parquet as select 1")`, we will fail to read it with Spark 2.0, as 
Spark 2.0 mistakenly treat it as data source table because there is a 
`provider` entry in table properties.

Logically Hive serde table's provider is always hive, we don't need to store it 
in table properties, this PR removes it.

## How was this patch tested?

manually test the forward compatibility issue.

Author: Wenchen Fan <wenc...@databricks.com>

Closes #16080 from cloud-fan/hive.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5f02b00
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5f02b00
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5f02b00

Branch: refs/heads/master
Commit: a5f02b00291e0a22429a3dca81f12cf6d38fea0b
Parents: 38b9e69
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Fri Dec 2 12:54:12 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Dec 2 12:54:12 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/hive/HiveExternalCatalog.scala    | 80 ++++++++++----------
 .../sql/hive/HiveExternalCatalogSuite.scala     | 18 +++++
 .../sql/hive/HiveMetastoreCatalogSuite.scala    |  2 -
 3 files changed, 59 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a5f02b00/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 1a9943b..0658832 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -21,6 +21,7 @@ import java.io.IOException
 import java.net.URI
 import java.util
 
+import scala.collection.mutable
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
@@ -219,9 +220,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
           // table location for tables in default database, while we expect to 
use the location of
           // default database.
           storage = tableDefinition.storage.copy(locationUri = tableLocation),
-          // Here we follow data source tables and put table metadata like 
provider, schema, etc. in
-          // table properties, so that we can work around the Hive metastore 
issue about not case
-          // preserving and make Hive serde table support mixed-case column 
names.
+          // Here we follow data source tables and put table metadata like 
table schema, partition
+          // columns etc. in table properties, so that we can work around the 
Hive metastore issue
+          // about not case preserving and make Hive serde table support 
mixed-case column names.
           properties = tableDefinition.properties ++ 
tableMetaToTableProps(tableDefinition))
         client.createTable(tableWithDataSourceProps, ignoreIfExists)
       } else {
@@ -233,10 +234,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
   }
 
   private def createDataSourceTable(table: CatalogTable, ignoreIfExists: 
Boolean): Unit = {
+    // data source table always have a provider, it's guaranteed by 
`DDLUtils.isDatasourceTable`.
+    val provider = table.provider.get
+
     // To work around some hive metastore issues, e.g. not case-preserving, 
bad decimal type
     // support, no column nullability, etc., we should do some extra works 
before saving table
     // metadata into Hive metastore:
-    //  1. Put table metadata like provider, schema, etc. in table properties.
+    //  1. Put table metadata like table schema, partition columns, etc. in 
table properties.
     //  2. Check if this table is hive compatible.
     //    2.1  If it's not hive compatible, set location URI, schema, 
partition columns and bucket
     //         spec to empty and save table metadata to Hive.
@@ -244,6 +248,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     //         it to Hive. If it fails, treat it as not hive compatible and go 
back to 2.1
     val tableProperties = tableMetaToTableProps(table)
 
+    // put table provider and partition provider in table properties.
+    tableProperties.put(DATASOURCE_PROVIDER, provider)
+    if (table.tracksPartitionsInCatalog) {
+      tableProperties.put(TABLE_PARTITION_PROVIDER, 
TABLE_PARTITION_PROVIDER_CATALOG)
+    }
+
     // Ideally we should also put `locationUri` in table properties like 
provider, schema, etc.
     // However, in older version of Spark we already store table location in 
storage properties
     // with key "path". Here we keep this behaviour for backward compatibility.
@@ -290,7 +300,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     }
 
     val qualifiedTableName = table.identifier.quotedString
-    val maybeSerde = HiveSerDe.sourceToSerDe(table.provider.get)
+    val maybeSerde = HiveSerDe.sourceToSerDe(provider)
     val skipHiveMetadata = table.storage.properties
       .getOrElse("skipHiveMetadata", "false").toBoolean
 
@@ -315,7 +325,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
         (Some(newHiveCompatibleMetastoreTable(serde)), message)
 
       case _ =>
-        val provider = table.provider.get
         val message =
           s"Couldn't find corresponding Hive SerDe for data source provider 
$provider. " +
             s"Persisting data source table $qualifiedTableName into Hive 
metastore in " +
@@ -349,21 +358,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
   /**
    * Data source tables may be non Hive compatible and we need to store table 
metadata in table
    * properties to workaround some Hive metastore limitations.
-   * This method puts table provider, partition provider, schema, partition 
column names, bucket
-   * specification into a map, which can be used as table properties later.
+   * This method puts table schema, partition column names, bucket 
specification into a map, which
+   * can be used as table properties later.
    */
-  private def tableMetaToTableProps(table: CatalogTable): 
scala.collection.Map[String, String] = {
-    // data source table always have a provider, it's guaranteed by 
`DDLUtils.isDatasourceTable`.
-    val provider = table.provider.get
+  private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, 
String] = {
     val partitionColumns = table.partitionColumnNames
     val bucketSpec = table.bucketSpec
 
-    val properties = new scala.collection.mutable.HashMap[String, String]
-    properties.put(DATASOURCE_PROVIDER, provider)
-    if (table.tracksPartitionsInCatalog) {
-      properties.put(TABLE_PARTITION_PROVIDER, 
TABLE_PARTITION_PROVIDER_CATALOG)
-    }
-
+    val properties = new mutable.HashMap[String, String]
     // Serialized JSON schema string may be too long to be stored into a 
single metastore table
     // property. In this case, we split the JSON string and store each part as 
a separate table
     // property.
@@ -617,14 +619,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
 
     if (table.tableType != VIEW) {
       table.properties.get(DATASOURCE_PROVIDER) match {
-        // No provider in table properties, which means this table is created 
by Spark prior to 2.1,
-        // or is created at Hive side.
+        // No provider in table properties, which means this is a Hive serde 
table.
         case None =>
-          table = table.copy(
-            provider = Some(DDLUtils.HIVE_PROVIDER), tracksPartitionsInCatalog 
= true)
-
-        // This is a Hive serde table created by Spark 2.1 or higher versions.
-        case Some(DDLUtils.HIVE_PROVIDER) =>
           table = restoreHiveSerdeTable(table)
 
         // This is a regular data source table.
@@ -637,7 +633,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     val statsProps = 
table.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
 
     if (statsProps.nonEmpty) {
-      val colStats = new scala.collection.mutable.HashMap[String, ColumnStat]
+      val colStats = new mutable.HashMap[String, ColumnStat]
 
       // For each column, recover its column stats. Note that this is 
currently a O(n^2) operation,
       // but given the number of columns it usually not enormous, this is 
probably OK as a start.
@@ -674,21 +670,27 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       provider = Some(DDLUtils.HIVE_PROVIDER),
       tracksPartitionsInCatalog = true)
 
-    val schemaFromTableProps = getSchemaFromTableProperties(table)
-    if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, 
table.schema)) {
-      hiveTable.copy(
-        schema = schemaFromTableProps,
-        partitionColumnNames = getPartitionColumnsFromTableProperties(table),
-        bucketSpec = getBucketSpecFromTableProperties(table))
+    // If this is a Hive serde table created by Spark 2.1 or higher versions, 
we should restore its
+    // schema from table properties.
+    if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
+      val schemaFromTableProps = getSchemaFromTableProperties(table)
+      if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, 
table.schema)) {
+        hiveTable.copy(
+          schema = schemaFromTableProps,
+          partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+          bucketSpec = getBucketSpecFromTableProperties(table))
+      } else {
+        // Hive metastore may change the table schema, e.g. schema inference. 
If the table
+        // schema we read back is different(ignore case and nullability) from 
the one in table
+        // properties which was written when creating table, we should respect 
the table schema
+        // from hive.
+        logWarning(s"The table schema given by Hive 
metastore(${table.schema.simpleString}) is " +
+          "different from the schema when this table was created by Spark SQL" 
+
+          s"(${schemaFromTableProps.simpleString}). We have to fall back to 
the table schema " +
+          "from Hive metastore which is not case preserving.")
+        hiveTable
+      }
     } else {
-      // Hive metastore may change the table schema, e.g. schema inference. If 
the table
-      // schema we read back is different(ignore case and nullability) from 
the one in table
-      // properties which was written when creating table, we should respect 
the table schema
-      // from hive.
-      logWarning(s"The table schema given by Hive 
metastore(${table.schema.simpleString}) is " +
-        "different from the schema when this table was created by Spark SQL" +
-        s"(${schemaFromTableProps.simpleString}). We have to fall back to the 
table schema from " +
-        "Hive metastore which is not case preserving.")
       hiveTable
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a5f02b00/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index efa0beb..6fee458 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -20,8 +20,11 @@ package org.apache.spark.sql.hive
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.SparkConf
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.types.StructType
 
 /**
  * Test suite for the [[HiveExternalCatalog]].
@@ -52,4 +55,19 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
     assert(selectedPartitions.length == 1)
     assert(selectedPartitions.head.spec == part1.spec)
   }
+
+  test("SPARK-18647: do not put provider in table properties for Hive serde 
table") {
+    val catalog = newBasicCatalog()
+    val hiveTable = CatalogTable(
+      identifier = TableIdentifier("hive_tbl", Some("db1")),
+      tableType = CatalogTableType.MANAGED,
+      storage = storageFormat,
+      schema = new StructType().add("col1", "int").add("col2", "string"),
+      provider = Some("hive"))
+    catalog.createTable(hiveTable, ignoreIfExists = false)
+
+    val rawTable = externalCatalog.client.getTable("db1", "hive_tbl")
+    
assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER))
+    assert(externalCatalog.getTable("db1", "hive_tbl").provider == 
Some(DDLUtils.HIVE_PROVIDER))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a5f02b00/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 7abc4d9..0a280b4 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.hive
 
-import java.io.File
-
 import org.apache.spark.sql.{QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to