Repository: spark
Updated Branches:
  refs/heads/master 43d71d965 -> 01a8e4627


[SPARK-21769][SQL] Add a table-specific option for always respecting schemas 
inferred/controlled by Spark SQL

## What changes were proposed in this pull request?
For Hive-serde tables, we always respect the schema stored in Hive metastore, 
because the schema could be altered by the other engines that share the same 
metastore. Thus, we always trust the metastore-controlled schema for Hive-serde 
tables when the schemas are different (without considering the nullability and 
cases). However, in some scenarios, Hive metastore also could INCORRECTLY 
overwrite the schemas when the serde and Hive metastore built-in serde are 
different.

The proposed solution is to introduce a table-specific option for such 
scenarios. For a specific table, users can make Spark always respect 
Spark-inferred/controlled schema instead of trusting metastore-controlled 
schema. By default, we trust Hive metastore-controlled schema.

## How was this patch tested?
Added a cross-version test case

Author: gatorsmile <gatorsm...@gmail.com>

Closes #19003 from gatorsmile/respectSparkSchema.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01a8e462
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01a8e462
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01a8e462

Branch: refs/heads/master
Commit: 01a8e46278dbfde916a74b6fd51e08804602e1cf
Parents: 43d71d9
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Tue Aug 22 13:12:59 2017 -0700
Committer: Sameer Agarwal <samee...@apache.org>
Committed: Tue Aug 22 13:12:59 2017 -0700

----------------------------------------------------------------------
 .../execution/datasources/SourceOptions.scala   |  50 +++++++++++++++++++
 .../spark/sql/hive/HiveExternalCatalog.scala    |  11 ++--
 .../src/test/resources/avroDecimal/decimal.avro | Bin 0 -> 203 bytes
 .../spark/sql/hive/client/VersionsSuite.scala   |  41 +++++++++++++++
 4 files changed, 97 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/01a8e462/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SourceOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SourceOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SourceOptions.scala
new file mode 100644
index 0000000..c98c0b2
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SourceOptions.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+
+/**
+ * Options for the data source.
+ */
+class SourceOptions(
+     @transient private val parameters: CaseInsensitiveMap[String])
+  extends Serializable {
+  import SourceOptions._
+
+  def this(parameters: Map[String, String]) = 
this(CaseInsensitiveMap(parameters))
+
+  // A flag to disable saving a data source table's metadata in hive 
compatible way.
+  val skipHiveMetadata: Boolean = parameters
+    
.get(SKIP_HIVE_METADATA).map(_.toBoolean).getOrElse(DEFAULT_SKIP_HIVE_METADATA)
+
+  // A flag to always respect the Spark schema restored from the table 
properties
+  val respectSparkSchema: Boolean = parameters
+    
.get(RESPECT_SPARK_SCHEMA).map(_.toBoolean).getOrElse(DEFAULT_RESPECT_SPARK_SCHEMA)
+}
+
+
+object SourceOptions {
+
+  val SKIP_HIVE_METADATA = "skipHiveMetadata"
+  val DEFAULT_SKIP_HIVE_METADATA = false
+
+  val RESPECT_SPARK_SCHEMA = "respectSparkSchema"
+  val DEFAULT_RESPECT_SPARK_SCHEMA = false
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/01a8e462/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index bdbb8bc..34af37c 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.execution.datasources.{PartitioningUtils, 
SourceOptions}
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.internal.StaticSQLConf._
@@ -260,6 +260,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
   private def createDataSourceTable(table: CatalogTable, ignoreIfExists: 
Boolean): Unit = {
     // data source table always have a provider, it's guaranteed by 
`DDLUtils.isDatasourceTable`.
     val provider = table.provider.get
+    val options = new SourceOptions(table.storage.properties)
 
     // To work around some hive metastore issues, e.g. not case-preserving, 
bad decimal type
     // support, no column nullability, etc., we should do some extra works 
before saving table
@@ -325,11 +326,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
 
     val qualifiedTableName = table.identifier.quotedString
     val maybeSerde = HiveSerDe.sourceToSerDe(provider)
-    val skipHiveMetadata = table.storage.properties
-      .getOrElse("skipHiveMetadata", "false").toBoolean
 
     val (hiveCompatibleTable, logMessage) = maybeSerde match {
-      case _ if skipHiveMetadata =>
+      case _ if options.skipHiveMetadata =>
         val message =
           s"Persisting data source table $qualifiedTableName into Hive 
metastore in" +
             "Spark SQL specific format, which is NOT compatible with Hive."
@@ -737,6 +736,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
   }
 
   private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
+    val options = new SourceOptions(table.storage.properties)
     val hiveTable = table.copy(
       provider = Some(DDLUtils.HIVE_PROVIDER),
       tracksPartitionsInCatalog = true)
@@ -748,7 +748,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       val partColumnNames = getPartitionColumnsFromTableProperties(table)
       val reorderedSchema = reorderSchema(schema = schemaFromTableProps, 
partColumnNames)
 
-      if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, 
table.schema)) {
+      if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, 
table.schema) ||
+          options.respectSparkSchema) {
         hiveTable.copy(
           schema = reorderedSchema,
           partitionColumnNames = partColumnNames,

http://git-wip-us.apache.org/repos/asf/spark/blob/01a8e462/sql/hive/src/test/resources/avroDecimal/decimal.avro
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/avroDecimal/decimal.avro 
b/sql/hive/src/test/resources/avroDecimal/decimal.avro
new file mode 100755
index 0000000..6da423f
Binary files /dev/null and 
b/sql/hive/src/test/resources/avroDecimal/decimal.avro differ

http://git-wip-us.apache.org/repos/asf/spark/blob/01a8e462/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 072e538..cbbe869 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -763,6 +763,47 @@ class VersionsSuite extends SparkFunSuite with Logging {
       }
     }
 
+    test(s"$version: read avro file containing decimal") {
+      val url = 
Thread.currentThread().getContextClassLoader.getResource("avroDecimal")
+      val location = new File(url.getFile)
+
+      val tableName = "tab1"
+      val avroSchema =
+        """{
+          |  "name": "test_record",
+          |  "type": "record",
+          |  "fields": [ {
+          |    "name": "f0",
+          |    "type": [
+          |      "null",
+          |      {
+          |        "precision": 38,
+          |        "scale": 2,
+          |        "type": "bytes",
+          |        "logicalType": "decimal"
+          |      }
+          |    ]
+          |  } ]
+          |}
+        """.stripMargin
+      withTable(tableName) {
+        versionSpark.sql(
+          s"""
+             |CREATE TABLE $tableName
+             |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+             |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true')
+             |STORED AS
+             |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+             |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+             |LOCATION '$location'
+             |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
+           """.stripMargin
+        )
+        assert(versionSpark.table(tableName).collect() ===
+          versionSpark.sql("SELECT 1.30").collect())
+      }
+    }
+
     // TODO: add more tests.
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to