Repository: spark
Updated Branches:
  refs/heads/branch-2.0 25006c8bc -> d5d2457e4


[SPARK-15968][SQL] Nonempty partitioned metastore tables are not cached

This PR backports your fix (https://github.com/apache/spark/pull/13818) to 
branch 2.0.

This PR addresses 
[SPARK-15968](https://issues.apache.org/jira/browse/SPARK-15968).

## What changes were proposed in this pull request?

The `getCached` method of 
[HiveMetastoreCatalog](https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala)
 computes `pathsInMetastore` from the metastore relation's catalog table. This 
only returns the table base path, which is incomplete/inaccurate for a nonempty 
partitioned table. As a result, cached lookups on nonempty partitioned tables 
always miss.

Rather than get `pathsInMetastore` from

    metastoreRelation.catalogTable.storage.locationUri.toSeq

I modified the `getCached` method to take a `pathsInMetastore` argument. Calls 
to this method pass in the paths computed from calls to the Hive metastore. 
This is how `getCached` was implemented in Spark 1.5:

https://github.com/apache/spark/blob/e0c3212a9b42e3e704b070da4ac25b68c584427f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L444.

I also added a call in `InsertIntoHiveTable.scala` to invalidate the table from 
the SQL session catalog.

## How was this patch tested?

I've added a new unit test to `parquetSuites.scala`:

    SPARK-15968: nonempty partitioned metastore Parquet table lookup should use 
cached relation

Note that the only difference between this new test and the one above it in the 
file is that the new test populates its partitioned table with a single value, 
while the existing test leaves the table empty. This reveals a subtle, 
unexpected hole in test coverage present before this patch.

Note I also modified a different but related unit test in `parquetSuites.scala`:

    SPARK-15248: explicitly added partitions should be readable

This unit test asserts that Spark SQL should return data from a table partition 
which has been placed there outside a metastore query immediately after it is 
added. I changed the test so that, instead of adding the data as a parquet file 
saved in the partition's location, the data is added through a SQL `INSERT` 
query. I made this change because I could find no way to efficiently support 
partitioned table caching without failing that test.

In addition to my primary motivation, I can offer a few reasons I believe this 
is an acceptable weakening of that test. First, it still validates a fix for 
[SPARK-15248](https://issues.apache.org/jira/browse/SPARK-15248), the issue for 
which it was written. Second, the assertion made is stronger than that required 
for non-partitioned tables. If you write data to the storage location of a 
non-partitioned metastore table without using a proper SQL DML query, a 
subsequent call to show that data will not return it. I believe this is an 
intentional limitation put in place to make table caching feasible, but I'm 
only speculating.

Building a large `HadoopFsRelation` requires `stat`-ing all of its data files. 
In our environment, where we have tables with 10's of thousands of partitions, 
the difference between using a cached relation versus a new one is a matter of 
seconds versus minutes. Caching partitioned table metadata vastly improves the 
usability of Spark SQL for these cases.

Author: Reynold Xin <[email protected]>
Author: Michael Allman <[email protected]>

Closes #14064 from yhuai/spark-15968-branch-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5d2457e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5d2457e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5d2457e

Branch: refs/heads/branch-2.0
Commit: d5d2457e41661a3402a8258026856454222a2f54
Parents: 25006c8
Author: Reynold Xin <[email protected]>
Authored: Wed Jul 6 21:35:56 2016 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Wed Jul 6 21:35:56 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 16 ++++-
 .../hive/execution/InsertIntoHiveTable.scala    |  1 +
 .../apache/spark/sql/hive/parquetSuites.scala   | 61 ++++++++++++++------
 3 files changed, 59 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d5d2457e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d6c5325..789f94a 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -187,6 +187,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
 
   private def getCached(
       tableIdentifier: QualifiedTableName,
+      pathsInMetastore: Seq[String],
       metastoreRelation: MetastoreRelation,
       schemaInMetastore: StructType,
       expectedFileFormat: Class[_ <: FileFormat],
@@ -196,7 +197,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
     cachedDataSourceTables.getIfPresent(tableIdentifier) match {
       case null => None // Cache miss
       case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
-        val pathsInMetastore = 
metastoreRelation.catalogTable.storage.locationUri.toSeq
         val cachedRelationFileFormatClass = relation.fileFormat.getClass
 
         expectedFileFormat match {
@@ -261,9 +261,22 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
         PartitionDirectory(values, location)
       }
       val partitionSpec = PartitionSpec(partitionSchema, partitions)
+      val partitionPaths = partitions.map(_.path.toString)
+
+      // By convention (for example, see 
MetaStorePartitionedTableFileCatalog), the definition of a
+      // partitioned table's paths depends on whether that table has any 
actual partitions.
+      // Partitioned tables without partitions use the location of the table's 
base path.
+      // Partitioned tables with partitions use the locations of those 
partitions' data locations,
+      // _omitting_ the table's base path.
+      val paths = if (partitionPaths.isEmpty) {
+        Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
+      } else {
+        partitionPaths
+      }
 
       val cached = getCached(
         tableIdentifier,
+        paths,
         metastoreRelation,
         metastoreSchema,
         fileFormatClass,
@@ -308,6 +321,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
       val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
 
       val cached = getCached(tableIdentifier,
+        paths,
         metastoreRelation,
         metastoreSchema,
         fileFormatClass,

http://git-wip-us.apache.org/repos/asf/spark/blob/d5d2457e/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 97cd29f..3d58d49 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -298,6 +298,7 @@ case class InsertIntoHiveTable(
 
     // Invalidate the cache.
     sqlContext.sharedState.cacheManager.invalidateCache(table)
+    sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)
 
     // It would be nice to just return the childRdd unchanged so insert 
operations could be chained,
     // however for now we return an empty list to simplify compatibility 
checks with hive, which

http://git-wip-us.apache.org/repos/asf/spark/blob/d5d2457e/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index a78a285..96beb2d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -389,17 +389,18 @@ class ParquetMetastoreSuite extends 
ParquetPartitioningTest {
   test("SPARK-7749: non-partitioned metastore Parquet table lookup should use 
cached relation") {
     withTable("nonPartitioned") {
       sql(
-        s"""CREATE TABLE nonPartitioned (
-           |  key INT,
-           |  value STRING
-           |)
-           |STORED AS PARQUET
-         """.stripMargin)
+        """
+          |CREATE TABLE nonPartitioned (
+          |  key INT,
+          |  value STRING
+          |)
+          |STORED AS PARQUET
+        """.stripMargin)
 
       // First lookup fills the cache
-      val r1 = collectHadoopFsRelation (table("nonPartitioned"))
+      val r1 = collectHadoopFsRelation(table("nonPartitioned"))
       // Second lookup should reuse the cache
-      val r2 = collectHadoopFsRelation (table("nonPartitioned"))
+      val r2 = collectHadoopFsRelation(table("nonPartitioned"))
       // They should be the same instance
       assert(r1 eq r2)
     }
@@ -408,18 +409,42 @@ class ParquetMetastoreSuite extends 
ParquetPartitioningTest {
   test("SPARK-7749: partitioned metastore Parquet table lookup should use 
cached relation") {
     withTable("partitioned") {
       sql(
-        s"""CREATE TABLE partitioned (
-           | key INT,
-           | value STRING
-           |)
-           |PARTITIONED BY (part INT)
-           |STORED AS PARQUET
-       """.stripMargin)
+        """
+          |CREATE TABLE partitioned (
+          |  key INT,
+          |  value STRING
+          |)
+          |PARTITIONED BY (part INT)
+          |STORED AS PARQUET
+        """.stripMargin)
+
+      // First lookup fills the cache
+      val r1 = collectHadoopFsRelation(table("partitioned"))
+      // Second lookup should reuse the cache
+      val r2 = collectHadoopFsRelation(table("partitioned"))
+      // They should be the same instance
+      assert(r1 eq r2)
+    }
+  }
+
+  test("SPARK-15968: nonempty partitioned metastore Parquet table lookup 
should use cached " +
+      "relation") {
+    withTable("partitioned") {
+      sql(
+        """
+          |CREATE TABLE partitioned (
+          |  key INT,
+          |  value STRING
+          |)
+          |PARTITIONED BY (part INT)
+          |STORED AS PARQUET
+        """.stripMargin)
+      sql("INSERT INTO TABLE partitioned PARTITION(part=0) SELECT 1 as key, 
'one' as value")
 
       // First lookup fills the cache
-      val r1 = collectHadoopFsRelation (table("partitioned"))
+      val r1 = collectHadoopFsRelation(table("partitioned"))
       // Second lookup should reuse the cache
-      val r2 = collectHadoopFsRelation (table("partitioned"))
+      val r2 = collectHadoopFsRelation(table("partitioned"))
       // They should be the same instance
       assert(r1 eq r2)
     }
@@ -557,7 +582,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
           Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
 
         // Add data files to partition directory and check whether they can be 
read
-        
Seq("baz").toDF("a").write.mode(SaveMode.Overwrite).parquet(partitionDir)
+        sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 
'baz' as a")
         checkAnswer(
           sql("SELECT * FROM test_added_partitions"),
           Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b"))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to