Repository: spark Updated Branches: refs/heads/branch-2.0 25006c8bc -> d5d2457e4
[SPARK-15968][SQL] Nonempty partitioned metastore tables are not cached This PR backports your fix (https://github.com/apache/spark/pull/13818) to branch 2.0. This PR addresses [SPARK-15968](https://issues.apache.org/jira/browse/SPARK-15968). ## What changes were proposed in this pull request? The `getCached` method of [HiveMetastoreCatalog](https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala) computes `pathsInMetastore` from the metastore relation's catalog table. This only returns the table base path, which is incomplete/inaccurate for a nonempty partitioned table. As a result, cached lookups on nonempty partitioned tables always miss. Rather than get `pathsInMetastore` from metastoreRelation.catalogTable.storage.locationUri.toSeq I modified the `getCached` method to take a `pathsInMetastore` argument. Calls to this method pass in the paths computed from calls to the Hive metastore. This is how `getCached` was implemented in Spark 1.5: https://github.com/apache/spark/blob/e0c3212a9b42e3e704b070da4ac25b68c584427f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L444. I also added a call in `InsertIntoHiveTable.scala` to invalidate the table from the SQL session catalog. ## How was this patch tested? I've added a new unit test to `parquetSuites.scala`: SPARK-15968: nonempty partitioned metastore Parquet table lookup should use cached relation Note that the only difference between this new test and the one above it in the file is that the new test populates its partitioned table with a single value, while the existing test leaves the table empty. This reveals a subtle, unexpected hole in test coverage present before this patch. Note I also modified a different but related unit test in `parquetSuites.scala`: SPARK-15248: explicitly added partitions should be readable This unit test asserts that Spark SQL should return data from a table partition which has been placed there outside a metastore query immediately after it is added. I changed the test so that, instead of adding the data as a parquet file saved in the partition's location, the data is added through a SQL `INSERT` query. I made this change because I could find no way to efficiently support partitioned table caching without failing that test. In addition to my primary motivation, I can offer a few reasons I believe this is an acceptable weakening of that test. First, it still validates a fix for [SPARK-15248](https://issues.apache.org/jira/browse/SPARK-15248), the issue for which it was written. Second, the assertion made is stronger than that required for non-partitioned tables. If you write data to the storage location of a non-partitioned metastore table without using a proper SQL DML query, a subsequent call to show that data will not return it. I believe this is an intentional limitation put in place to make table caching feasible, but I'm only speculating. Building a large `HadoopFsRelation` requires `stat`-ing all of its data files. In our environment, where we have tables with 10's of thousands of partitions, the difference between using a cached relation versus a new one is a matter of seconds versus minutes. Caching partitioned table metadata vastly improves the usability of Spark SQL for these cases. Author: Reynold Xin <[email protected]> Author: Michael Allman <[email protected]> Closes #14064 from yhuai/spark-15968-branch-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5d2457e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5d2457e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5d2457e Branch: refs/heads/branch-2.0 Commit: d5d2457e41661a3402a8258026856454222a2f54 Parents: 25006c8 Author: Reynold Xin <[email protected]> Authored: Wed Jul 6 21:35:56 2016 +0800 Committer: Wenchen Fan <[email protected]> Committed: Wed Jul 6 21:35:56 2016 +0800 ---------------------------------------------------------------------- .../spark/sql/hive/HiveMetastoreCatalog.scala | 16 ++++- .../hive/execution/InsertIntoHiveTable.scala | 1 + .../apache/spark/sql/hive/parquetSuites.scala | 61 ++++++++++++++------ 3 files changed, 59 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d5d2457e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d6c5325..789f94a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -187,6 +187,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log private def getCached( tableIdentifier: QualifiedTableName, + pathsInMetastore: Seq[String], metastoreRelation: MetastoreRelation, schemaInMetastore: StructType, expectedFileFormat: Class[_ <: FileFormat], @@ -196,7 +197,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => None // Cache miss case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) => - val pathsInMetastore = metastoreRelation.catalogTable.storage.locationUri.toSeq val cachedRelationFileFormatClass = relation.fileFormat.getClass expectedFileFormat match { @@ -261,9 +261,22 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log PartitionDirectory(values, location) } val partitionSpec = PartitionSpec(partitionSchema, partitions) + val partitionPaths = partitions.map(_.path.toString) + + // By convention (for example, see MetaStorePartitionedTableFileCatalog), the definition of a + // partitioned table's paths depends on whether that table has any actual partitions. + // Partitioned tables without partitions use the location of the table's base path. + // Partitioned tables with partitions use the locations of those partitions' data locations, + // _omitting_ the table's base path. + val paths = if (partitionPaths.isEmpty) { + Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) + } else { + partitionPaths + } val cached = getCached( tableIdentifier, + paths, metastoreRelation, metastoreSchema, fileFormatClass, @@ -308,6 +321,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) val cached = getCached(tableIdentifier, + paths, metastoreRelation, metastoreSchema, fileFormatClass, http://git-wip-us.apache.org/repos/asf/spark/blob/d5d2457e/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 97cd29f..3d58d49 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -298,6 +298,7 @@ case class InsertIntoHiveTable( // Invalidate the cache. sqlContext.sharedState.cacheManager.invalidateCache(table) + sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier) // It would be nice to just return the childRdd unchanged so insert operations could be chained, // however for now we return an empty list to simplify compatibility checks with hive, which http://git-wip-us.apache.org/repos/asf/spark/blob/d5d2457e/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index a78a285..96beb2d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -389,17 +389,18 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { test("SPARK-7749: non-partitioned metastore Parquet table lookup should use cached relation") { withTable("nonPartitioned") { sql( - s"""CREATE TABLE nonPartitioned ( - | key INT, - | value STRING - |) - |STORED AS PARQUET - """.stripMargin) + """ + |CREATE TABLE nonPartitioned ( + | key INT, + | value STRING + |) + |STORED AS PARQUET + """.stripMargin) // First lookup fills the cache - val r1 = collectHadoopFsRelation (table("nonPartitioned")) + val r1 = collectHadoopFsRelation(table("nonPartitioned")) // Second lookup should reuse the cache - val r2 = collectHadoopFsRelation (table("nonPartitioned")) + val r2 = collectHadoopFsRelation(table("nonPartitioned")) // They should be the same instance assert(r1 eq r2) } @@ -408,18 +409,42 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { test("SPARK-7749: partitioned metastore Parquet table lookup should use cached relation") { withTable("partitioned") { sql( - s"""CREATE TABLE partitioned ( - | key INT, - | value STRING - |) - |PARTITIONED BY (part INT) - |STORED AS PARQUET - """.stripMargin) + """ + |CREATE TABLE partitioned ( + | key INT, + | value STRING + |) + |PARTITIONED BY (part INT) + |STORED AS PARQUET + """.stripMargin) + + // First lookup fills the cache + val r1 = collectHadoopFsRelation(table("partitioned")) + // Second lookup should reuse the cache + val r2 = collectHadoopFsRelation(table("partitioned")) + // They should be the same instance + assert(r1 eq r2) + } + } + + test("SPARK-15968: nonempty partitioned metastore Parquet table lookup should use cached " + + "relation") { + withTable("partitioned") { + sql( + """ + |CREATE TABLE partitioned ( + | key INT, + | value STRING + |) + |PARTITIONED BY (part INT) + |STORED AS PARQUET + """.stripMargin) + sql("INSERT INTO TABLE partitioned PARTITION(part=0) SELECT 1 as key, 'one' as value") // First lookup fills the cache - val r1 = collectHadoopFsRelation (table("partitioned")) + val r1 = collectHadoopFsRelation(table("partitioned")) // Second lookup should reuse the cache - val r2 = collectHadoopFsRelation (table("partitioned")) + val r2 = collectHadoopFsRelation(table("partitioned")) // They should be the same instance assert(r1 eq r2) } @@ -557,7 +582,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { Seq(("foo", 0), ("bar", 0)).toDF("a", "b")) // Add data files to partition directory and check whether they can be read - Seq("baz").toDF("a").write.mode(SaveMode.Overwrite).parquet(partitionDir) + sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a") checkAnswer( sql("SELECT * FROM test_added_partitions"), Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b")) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
