Repository: spark Updated Branches: refs/heads/master dadff5f07 -> de8a03e68
[SPARK-19459][SQL] Add Hive datatype (char/varchar) to StructField metadata ## What changes were proposed in this pull request? Reading from an existing ORC table which contains `char` or `varchar` columns can fail with a `ClassCastException` if the table metadata has been created using Spark. This is caused by the fact that spark internally replaces `char` and `varchar` columns with a `string` column. This PR fixes this by adding the hive type to the `StructField's` metadata under the `HIVE_TYPE_STRING` key. This is picked up by the `HiveClient` and the ORC reader, see https://github.com/apache/spark/pull/16060 for more details on how the metadata is used. ## How was this patch tested? Added a regression test to `OrcSourceSuite`. Author: Herman van Hovell <hvanhov...@databricks.com> Closes #16804 from hvanhovell/SPARK-19459. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de8a03e6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de8a03e6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de8a03e6 Branch: refs/heads/master Commit: de8a03e68202647555e30fffba551f65bc77608d Parents: dadff5f Author: Herman van Hovell <hvanhov...@databricks.com> Authored: Fri Feb 10 11:06:57 2017 -0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Fri Feb 10 11:06:57 2017 -0800 ---------------------------------------------------------------------- .../spark/sql/catalyst/parser/AstBuilder.scala | 24 +++++++++++-- .../org/apache/spark/sql/types/package.scala | 10 +++++- .../spark/sql/sources/TableScanSuite.scala | 8 +++-- .../org/apache/spark/sql/hive/HiveUtils.scala | 14 ++------ .../spark/sql/hive/client/HiveClientImpl.scala | 8 ++--- .../spark/sql/hive/orc/OrcSourceSuite.scala | 37 +++++++++++++++++--- 6 files changed, 76 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/de8a03e6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 3969fdb..bb07558 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -1457,8 +1457,28 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { */ override def visitColType(ctx: ColTypeContext): StructField = withOrigin(ctx) { import ctx._ - val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true) - if (STRING == null) structField else structField.withComment(string(STRING)) + + val builder = new MetadataBuilder + // Add comment to metadata + if (STRING != null) { + builder.putString("comment", string(STRING)) + } + // Add Hive type string to metadata. + dataType match { + case p: PrimitiveDataTypeContext => + p.identifier.getText.toLowerCase match { + case "varchar" | "char" => + builder.putString(HIVE_TYPE_STRING, dataType.getText.toLowerCase) + case _ => + } + case _ => + } + + StructField( + identifier.getText, + typedVisit(dataType), + nullable = true, + builder.build()) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/de8a03e6/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala index 346a51e..f29cbc2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala @@ -21,4 +21,12 @@ package org.apache.spark.sql * Contains a type system for attributes produced by relations, including complex types like * structs, arrays and maps. */ -package object types +package object types { + /** + * Metadata key used to store the raw hive type string in the metadata of StructField. This + * is relevant for datatypes that do not have a direct Spark SQL counterpart, such as CHAR and + * VARCHAR. We need to preserve the original type in order to invoke the correct object + * inspector in Hive. + */ + val HIVE_TYPE_STRING = "HIVE_TYPE_STRING" +} http://git-wip-us.apache.org/repos/asf/spark/blob/de8a03e6/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index 86bcb4d..b01d15e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -203,6 +203,10 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext { (2 to 10).map(i => Row(i, i - 1)).toSeq) test("Schema and all fields") { + def hiveMetadata(dt: String): Metadata = { + new MetadataBuilder().putString(HIVE_TYPE_STRING, dt).build() + } + val expectedSchema = StructType( StructField("string$%Field", StringType, true) :: StructField("binaryField", BinaryType, true) :: @@ -217,8 +221,8 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext { StructField("decimalField2", DecimalType(9, 2), true) :: StructField("dateField", DateType, true) :: StructField("timestampField", TimestampType, true) :: - StructField("varcharField", StringType, true) :: - StructField("charField", StringType, true) :: + StructField("varcharField", StringType, true, hiveMetadata("varchar(12)")) :: + StructField("charField", StringType, true, hiveMetadata("char(18)")) :: StructField("arrayFieldSimple", ArrayType(IntegerType), true) :: StructField("arrayFieldComplex", ArrayType( http://git-wip-us.apache.org/repos/asf/spark/blob/de8a03e6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 312ec67..13ab4e8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -61,14 +61,6 @@ private[spark] object HiveUtils extends Logging { /** The version of hive used internally by Spark SQL. */ val hiveExecutionVersion: String = "1.2.1" - /** - * The property key that is used to store the raw hive type string in the metadata of StructField. - * For example, in the case where the Hive type is varchar, the type gets mapped to a string type - * in Spark SQL, but we need to preserve the original type in order to invoke the correct object - * inspector in Hive. - */ - val hiveTypeString: String = "HIVE_TYPE_STRING" - val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version") .doc("Version of the Hive metastore. Available options are " + s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.") @@ -465,8 +457,8 @@ private[spark] object HiveUtils extends Logging { /** Converts the native StructField to Hive's FieldSchema. */ private def toHiveColumn(c: StructField): FieldSchema = { - val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) { - c.metadata.getString(HiveUtils.hiveTypeString) + val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) { + c.metadata.getString(HIVE_TYPE_STRING) } else { c.dataType.catalogString } @@ -482,7 +474,7 @@ private[spark] object HiveUtils extends Logging { throw new SparkException("Cannot recognize hive type string: " + hc.getType, e) } - val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build() + val metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build() val field = StructField( name = hc.getName, dataType = columnType, http://git-wip-us.apache.org/repos/asf/spark/blob/de8a03e6/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index bf703a5..f0d01eb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.hive.HiveUtils -import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} /** @@ -790,8 +790,8 @@ private[hive] class HiveClientImpl( .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] private def toHiveColumn(c: StructField): FieldSchema = { - val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) { - c.metadata.getString(HiveUtils.hiveTypeString) + val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) { + c.metadata.getString(HIVE_TYPE_STRING) } else { c.dataType.catalogString } @@ -806,7 +806,7 @@ private[hive] class HiveClientImpl( throw new SparkException("Cannot recognize hive type string: " + hc.getType, e) } - val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build() + val metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build() val field = StructField( name = hc.getName, dataType = columnType, http://git-wip-us.apache.org/repos/asf/spark/blob/de8a03e6/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index fe1e17d..59ea891 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -152,14 +152,41 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE") } - test("SPARK-18220: read Hive orc table with varchar column") { + test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") { val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + val location = Utils.createTempDir() + val uri = location.toURI try { - hiveClient.runSqlHive("CREATE TABLE orc_varchar(a VARCHAR(10)) STORED AS orc") - hiveClient.runSqlHive("INSERT INTO TABLE orc_varchar SELECT 'a' FROM (SELECT 1) t") - checkAnswer(spark.table("orc_varchar"), Row("a")) + hiveClient.runSqlHive( + """ + |CREATE EXTERNAL TABLE hive_orc( + | a STRING, + | b CHAR(10), + | c VARCHAR(10)) + |STORED AS orc""".stripMargin) + // Hive throws an exception if I assign the location in the create table statement. + hiveClient.runSqlHive( + s"ALTER TABLE hive_orc SET LOCATION '$uri'") + hiveClient.runSqlHive( + "INSERT INTO TABLE hive_orc SELECT 'a', 'b', 'c' FROM (SELECT 1) t") + + // We create a different table in Spark using the same schema which points to + // the same location. + spark.sql( + s""" + |CREATE EXTERNAL TABLE spark_orc( + | a STRING, + | b CHAR(10), + | c VARCHAR(10)) + |STORED AS orc + |LOCATION '$uri'""".stripMargin) + val result = Row("a", "b ", "c") + checkAnswer(spark.table("hive_orc"), result) + checkAnswer(spark.table("spark_orc"), result) } finally { - hiveClient.runSqlHive("DROP TABLE IF EXISTS orc_varchar") + hiveClient.runSqlHive("DROP TABLE IF EXISTS hive_orc") + hiveClient.runSqlHive("DROP TABLE IF EXISTS spark_orc") + Utils.deleteRecursively(location) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org