Repository: spark
Updated Branches:
  refs/heads/branch-2.1 6e044ab9a -> 3de93fb48


[SPARK-18220][SQL] read Hive orc table with varchar column should not fail

## What changes were proposed in this pull request?

Spark SQL only has `StringType`, when reading hive table with varchar column, 
we will read that column as `StringType`. However, we still need to use varchar 
`ObjectInspector` to read varchar column in hive table, which means we need to 
know the actual column type at hive side.

In Spark 2.1, after https://github.com/apache/spark/pull/14363 , we parse hive 
type string to catalyst type, which means the actual column type at hive side 
is erased. Then we may use string `ObjectInspector` to read varchar column and 
fail.

This PR keeps the original hive column type string in the metadata of 
`StructField`, and use it when we convert it to a hive column.

## How was this patch tested?

newly added regression test

Author: Wenchen Fan <wenc...@databricks.com>

Closes #16060 from cloud-fan/varchar.

(cherry picked from commit 3f03c90a807872d47588f3c3920769b8978033bf)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3de93fb4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3de93fb4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3de93fb4

Branch: refs/heads/branch-2.1
Commit: 3de93fb480ce316e9b35a025dd350123084c3565
Parents: 6e044ab
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Wed Nov 30 09:47:30 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Nov 30 09:47:36 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/hive/HiveUtils.scala  |  8 ++++++++
 .../apache/spark/sql/hive/MetastoreRelation.scala    |  7 ++++++-
 .../spark/sql/hive/client/HiveClientImpl.scala       | 15 ++++++++++++---
 ...veExternalCatalogBackwardCompatibilitySuite.scala |  4 ++--
 .../apache/spark/sql/hive/orc/OrcSourceSuite.scala   | 12 ++++++++++++
 5 files changed, 40 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3de93fb4/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 81cd65c..26b1994 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -54,6 +54,14 @@ private[spark] object HiveUtils extends Logging {
   /** The version of hive used internally by Spark SQL. */
   val hiveExecutionVersion: String = "1.2.1"
 
+  /**
+   * The property key that is used to store the raw hive type string in the 
metadata of StructField.
+   * For example, in the case where the Hive type is varchar, the type gets 
mapped to a string type
+   * in Spark SQL, but we need to preserve the original type in order to 
invoke the correct object
+   * inspector in Hive.
+   */
+  val hiveTypeString: String = "HIVE_TYPE_STRING"
+
   val HIVE_METASTORE_VERSION = 
SQLConfigBuilder("spark.sql.hive.metastore.version")
     .doc("Version of the Hive metastore. Available options are " +
         s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.")

http://git-wip-us.apache.org/repos/asf/spark/blob/3de93fb4/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index da809cf..3bbac05 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -61,7 +61,12 @@ private[hive] case class MetastoreRelation(
   override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: 
sparkSession :: Nil
 
   private def toHiveColumn(c: StructField): FieldSchema = {
-    new FieldSchema(c.name, c.dataType.catalogString, c.getComment.orNull)
+    val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
+      c.metadata.getString(HiveUtils.hiveTypeString)
+    } else {
+      c.dataType.catalogString
+    }
+    new FieldSchema(c.name, typeString, c.getComment.orNull)
   }
 
   // TODO: merge this with HiveClientImpl#toHiveTable

http://git-wip-us.apache.org/repos/asf/spark/blob/3de93fb4/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 68dcfd8..590029a 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -46,7 +46,8 @@ import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
 import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
 import org.apache.spark.util.{CircularBuffer, Utils}
 
 /**
@@ -748,7 +749,12 @@ private[hive] class HiveClientImpl(
       .asInstanceOf[Class[_ <: 
org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
 
   private def toHiveColumn(c: StructField): FieldSchema = {
-    new FieldSchema(c.name, c.dataType.catalogString, c.getComment().orNull)
+    val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
+      c.metadata.getString(HiveUtils.hiveTypeString)
+    } else {
+      c.dataType.catalogString
+    }
+    new FieldSchema(c.name, typeString, c.getComment().orNull)
   }
 
   private def fromHiveColumn(hc: FieldSchema): StructField = {
@@ -758,10 +764,13 @@ private[hive] class HiveClientImpl(
       case e: ParseException =>
         throw new SparkException("Cannot recognize hive type string: " + 
hc.getType, e)
     }
+
+    val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, 
hc.getType).build()
     val field = StructField(
       name = hc.getName,
       dataType = columnType,
-      nullable = true)
+      nullable = true,
+      metadata = metadata)
     Option(hc.getComment).map(field.withComment).getOrElse(field)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3de93fb4/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
index cca4480..c5753ce 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
@@ -205,7 +205,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends 
QueryTest
   test("make sure we can read table created by old version of Spark") {
     for ((tbl, expectedSchema) <- rawTablesAndExpectations) {
       val readBack = getTableMetadata(tbl.identifier.table)
-      assert(readBack.schema == expectedSchema)
+      assert(readBack.schema.sameType(expectedSchema))
 
       if (tbl.tableType == CatalogTableType.EXTERNAL) {
         // trim the URI prefix
@@ -235,7 +235,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends 
QueryTest
       sql(s"ALTER TABLE ${tbl.identifier} RENAME TO $newName")
 
       val readBack = getTableMetadata(newName)
-      assert(readBack.schema == expectedSchema)
+      assert(readBack.schema.sameType(expectedSchema))
 
       // trim the URI prefix
       val actualTableLocation = new 
URI(readBack.storage.locationUri.get).getPath

http://git-wip-us.apache.org/repos/asf/spark/blob/3de93fb4/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 12f9480..2b40469 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -150,6 +151,17 @@ abstract class OrcSuite extends QueryTest with 
TestHiveSingleton with BeforeAndA
   test("SPARK-18433: Improve DataSource option keys to be more 
case-insensitive") {
     assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == 
"NONE")
   }
+
+  test("SPARK-18220: read Hive orc table with varchar column") {
+    val hiveClient = 
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+    try {
+      hiveClient.runSqlHive("CREATE TABLE orc_varchar(a VARCHAR(10)) STORED AS 
orc")
+      hiveClient.runSqlHive("INSERT INTO TABLE orc_varchar SELECT 'a' FROM 
(SELECT 1) t")
+      checkAnswer(spark.table("orc_varchar"), Row("a"))
+    } finally {
+      hiveClient.runSqlHive("DROP TABLE IF EXISTS orc_varchar")
+    }
+  }
 }
 
 class OrcSourceSuite extends OrcSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to