Repository: spark Updated Branches: refs/heads/master 0077bfcb9 -> 4d4d0de7f
[SPARK-19279][SQL][FOLLOW-UP] Infer Schema for Hive Serde Tables ### What changes were proposed in this pull request? `table.schema` is always not empty for partitioned tables, because `table.schema` also contains the partitioned columns, even if the original table does not have any column. This PR is to fix the issue. ### How was this patch tested? Added a test case Author: gatorsmile <gatorsm...@gmail.com> Closes #16848 from gatorsmile/inferHiveSerdeSchema. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d4d0de7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d4d0de7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d4d0de7 Branch: refs/heads/master Commit: 4d4d0de7f64cefbca28dc532b7864de9626aa241 Parents: 0077bfc Author: gatorsmile <gatorsm...@gmail.com> Authored: Wed Feb 8 10:11:44 2017 -0500 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Wed Feb 8 10:11:44 2017 -0500 ---------------------------------------------------------------------- .../spark/sql/catalyst/catalog/interface.scala | 8 ++++ .../org/apache/spark/sql/hive/HiveUtils.scala | 2 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 44 ++++++++++++++++++++ 3 files changed, 53 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4d4d0de7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index a8fa78d..353e595 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -194,6 +194,14 @@ case class CatalogTable( StructType(partitionFields) } + /** + * schema of this table's data columns + */ + def dataSchema: StructType = { + val dataFields = schema.dropRight(partitionColumnNames.length) + StructType(dataFields) + } + /** Return the database this table was specified to belong to, assuming it exists. */ def database: String = identifier.database.getOrElse { throw new AnalysisException(s"table $identifier did not specify database") http://git-wip-us.apache.org/repos/asf/spark/blob/4d4d0de7/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 30abc62..312ec67 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -580,7 +580,7 @@ private[spark] object HiveUtils extends Logging { * CatalogTable. */ def inferSchema(table: CatalogTable): CatalogTable = { - if (DDLUtils.isDatasourceTable(table) || table.schema.nonEmpty) { + if (DDLUtils.isDatasourceTable(table) || table.dataSchema.nonEmpty) { table } else { val hiveTable = toHiveTable(table) http://git-wip-us.apache.org/repos/asf/spark/blob/4d4d0de7/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index c262095..cf1fe2b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.SparkContext import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.execution.command.CreateTableCommand import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.spark.sql.hive.client.HiveClient @@ -1308,6 +1309,49 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } + test("Infer schema for Hive serde tables") { + val tableName = "tab1" + val avroSchema = + """{ + | "name": "test_record", + | "type": "record", + | "fields": [ { + | "name": "f0", + | "type": "int" + | }] + |} + """.stripMargin + + Seq(true, false).foreach { isPartitioned => + withTable(tableName) { + val partitionClause = if (isPartitioned) "PARTITIONED BY (ds STRING)" else "" + // Creates the (non-)partitioned Avro table + val plan = sql( + s""" + |CREATE TABLE $tableName + |$partitionClause + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema') + """.stripMargin + ).queryExecution.analyzed + + assert(plan.isInstanceOf[CreateTableCommand] && + plan.asInstanceOf[CreateTableCommand].table.dataSchema.nonEmpty) + + if (isPartitioned) { + sql(s"INSERT OVERWRITE TABLE $tableName partition (ds='a') SELECT 1") + checkAnswer(spark.table(tableName), Row(1, "a")) + } else { + sql(s"INSERT OVERWRITE TABLE $tableName SELECT 1") + checkAnswer(spark.table(tableName), Row(1)) + } + } + } + } + private def withDebugMode(f: => Unit): Unit = { val previousValue = sparkSession.sparkContext.conf.get(DEBUG_MODE) try { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org