Repository: spark
Updated Branches:
  refs/heads/master d6f76eb34 -> 0c88e8d37


[SPARK-21085][SQL] Failed to read the partitioned table created by Spark 2.1

### What changes were proposed in this pull request?
Before the PR, Spark is unable to read the partitioned table created by Spark 
2.1 when the table schema does not put the partitioning column at the end of 
the schema.
[assert(partitionFields.map(_.name) == 
partitionColumnNames)](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala#L234-L236)

When reading the table metadata from the metastore, we also need to reorder the 
columns.

### How was this patch tested?
Added test cases to check both Hive-serde and data source tables.

Author: gatorsmile <[email protected]>

Closes #18295 from gatorsmile/reorderReadSchema.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c88e8d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c88e8d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c88e8d3

Branch: refs/heads/master
Commit: 0c88e8d37224713199ca5661c2cd57f5918dcb9a
Parents: d6f76eb
Author: gatorsmile <[email protected]>
Authored: Wed Jun 14 16:28:06 2017 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Wed Jun 14 16:28:06 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/hive/HiveExternalCatalog.scala    | 31 ++++++++++++++++----
 .../sql/hive/HiveExternalCatalogSuite.scala     | 26 ++++++++++++++++
 2 files changed, 52 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0c88e8d3/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 7fcf06d..1945367 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -729,6 +729,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       properties = table.properties.filterNot { case (key, _) => 
key.startsWith(SPARK_SQL_PREFIX) })
   }
 
+  // Reorder table schema to put partition columns at the end. Before Spark 
2.2, the partition
+  // columns are not put at the end of schema. We need to reorder it when 
reading the schema
+  // from the table properties.
+  private def reorderSchema(schema: StructType, partColumnNames: Seq[String]): 
StructType = {
+    val partitionFields = partColumnNames.map { partCol =>
+      schema.find(_.name == partCol).getOrElse {
+        throw new AnalysisException("The metadata is corrupted. Unable to find 
the " +
+          s"partition column names from the schema. schema: 
${schema.catalogString}. " +
+          s"Partition columns: ${partColumnNames.mkString("[", ", ", "]")}")
+      }
+    }
+    StructType(schema.filterNot(partitionFields.contains) ++ partitionFields)
+  }
+
   private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
     val hiveTable = table.copy(
       provider = Some(DDLUtils.HIVE_PROVIDER),
@@ -738,10 +752,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     // schema from table properties.
     if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
       val schemaFromTableProps = getSchemaFromTableProperties(table)
-      if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, 
table.schema)) {
+      val partColumnNames = getPartitionColumnsFromTableProperties(table)
+      val reorderedSchema = reorderSchema(schema = schemaFromTableProps, 
partColumnNames)
+
+      if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, 
table.schema)) {
         hiveTable.copy(
-          schema = schemaFromTableProps,
-          partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+          schema = reorderedSchema,
+          partitionColumnNames = partColumnNames,
           bucketSpec = getBucketSpecFromTableProperties(table))
       } else {
         // Hive metastore may change the table schema, e.g. schema inference. 
If the table
@@ -771,11 +788,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     }
     val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
 
+    val schemaFromTableProps = getSchemaFromTableProperties(table)
+    val partColumnNames = getPartitionColumnsFromTableProperties(table)
+    val reorderedSchema = reorderSchema(schema = schemaFromTableProps, 
partColumnNames)
+
     table.copy(
       provider = Some(provider),
       storage = storageWithLocation,
-      schema = getSchemaFromTableProperties(table),
-      partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+      schema = reorderedSchema,
+      partitionColumnNames = partColumnNames,
       bucketSpec = getBucketSpecFromTableProperties(table),
       tracksPartitionsInCatalog = partitionProvider == 
Some(TABLE_PARTITION_PROVIDER_CATALOG))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0c88e8d3/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index bd54c04..d43534d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -63,4 +63,30 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
     
assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER))
     assert(DDLUtils.isHiveTable(externalCatalog.getTable("db1", "hive_tbl")))
   }
+
+  Seq("parquet", "hive").foreach { format =>
+    test(s"Partition columns should be put at the end of table schema for the 
format $format") {
+      val catalog = newBasicCatalog()
+      val newSchema = new StructType()
+        .add("col1", "int")
+        .add("col2", "string")
+        .add("partCol1", "int")
+        .add("partCol2", "string")
+      val table = CatalogTable(
+        identifier = TableIdentifier("tbl", Some("db1")),
+        tableType = CatalogTableType.MANAGED,
+        storage = CatalogStorageFormat.empty,
+        schema = new StructType()
+          .add("col1", "int")
+          .add("partCol1", "int")
+          .add("partCol2", "string")
+          .add("col2", "string"),
+        provider = Some(format),
+        partitionColumnNames = Seq("partCol1", "partCol2"))
+      catalog.createTable(table, ignoreIfExists = false)
+
+      val restoredTable = externalCatalog.getTable("db1", "tbl")
+      assert(restoredTable.schema == newSchema)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to