Repository: spark
Updated Branches:
  refs/heads/master 941b3f9ac -> 5f20ae039


[SPARK-17980][SQL] Fix refreshByPath for converted Hive tables

## What changes were proposed in this pull request?

There was a bug introduced in https://github.com/apache/spark/pull/14690 which 
broke refreshByPath with converted hive tables (though, it turns out it was 
very difficult to refresh converted hive tables anyways, since you had to 
specify the exact path of one of the partitions).

This changes refreshByPath to invalidate by prefix instead of exact match, and 
fixes the issue.

cc sameeragarwal for refreshByPath changes
mallman

## How was this patch tested?

Extended unit test.

Author: Eric Liang <e...@databricks.com>

Closes #15521 from ericl/fix-caching.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f20ae03
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f20ae03
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f20ae03

Branch: refs/heads/master
Commit: 5f20ae0394388574a3767daf7f499c89658f61e1
Parents: 941b3f9
Author: Eric Liang <e...@databricks.com>
Authored: Wed Oct 19 10:20:12 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Oct 19 10:20:12 2016 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/catalog/Catalog.scala  |  3 ++-
 .../spark/sql/execution/CacheManager.scala      |  5 +++--
 .../datasources/TableFileCatalog.scala          | 18 +++++++++++++----
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  2 +-
 .../spark/sql/hive/HiveMetadataCacheSuite.scala | 21 ++++++++++++++++++--
 5 files changed, 39 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5f20ae03/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 889b8a0..aecdda1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -343,7 +343,8 @@ abstract class Catalog {
 
   /**
    * Invalidate and refresh all the cached data (and the associated metadata) 
for any dataframe that
-   * contains the given data source path.
+   * contains the given data source path. Path matching is by prefix, i.e. "/" 
would invalidate
+   * everything that is cached.
    *
    * @since 2.0.0
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/5f20ae03/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 92fd366..fb72c67 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -185,9 +185,10 @@ class CacheManager extends Logging {
     plan match {
       case lr: LogicalRelation => lr.relation match {
         case hr: HadoopFsRelation =>
+          val prefixToInvalidate = qualifiedPath.toString
           val invalidate = hr.location.rootPaths
-            .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory))
-            .contains(qualifiedPath)
+            .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory).toString)
+            .exists(_.startsWith(prefixToInvalidate))
           if (invalidate) hr.location.refresh()
           invalidate
         case _ => false

http://git-wip-us.apache.org/repos/asf/spark/blob/5f20ae03/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
index 5648ab4..fc08c37 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
@@ -48,13 +48,18 @@ class TableFileCatalog(
 
   private val baseLocation = catalogTable.storage.locationUri
 
+  // Populated on-demand by calls to cachedAllPartitions
+  private var cachedAllPartitions: ListingFileCatalog = null
+
   override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
 
   override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
     filterPartitions(filters).listFiles(Nil)
   }
 
-  override def refresh(): Unit = {}
+  override def refresh(): Unit = synchronized {
+    cachedAllPartitions = null
+  }
 
   /**
    * Returns a [[ListingFileCatalog]] for this table restricted to the subset 
of partitions
@@ -64,7 +69,7 @@ class TableFileCatalog(
    */
   def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
     if (filters.isEmpty) {
-      cachedAllPartitions
+      allPartitions
     } else {
       filterPartitions0(filters)
     }
@@ -89,9 +94,14 @@ class TableFileCatalog(
   }
 
   // Not used in the hot path of queries when metastore partition pruning is 
enabled
-  lazy val cachedAllPartitions: ListingFileCatalog = filterPartitions0(Nil)
+  def allPartitions: ListingFileCatalog = synchronized {
+    if (cachedAllPartitions == null) {
+      cachedAllPartitions = filterPartitions0(Nil)
+    }
+    cachedAllPartitions
+  }
 
-  override def inputFiles: Array[String] = cachedAllPartitions.inputFiles
+  override def inputFiles: Array[String] = allPartitions.inputFiles
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/5f20ae03/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 16e1e37..c909eb5 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -235,7 +235,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
           if (lazyPruningEnabled) {
             catalog
           } else {
-            catalog.cachedAllPartitions
+            catalog.allPartitions
           }
         }
         val partitionSchemaColumnNames = 
partitionSchema.map(_.name.toLowerCase).toSet

http://git-wip-us.apache.org/repos/asf/spark/blob/5f20ae03/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
index 7af81a3..2ca1cd4 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
@@ -80,9 +80,13 @@ class HiveMetadataCacheSuite extends QueryTest with 
SQLTestUtils with TestHiveSi
             val df = spark.sql("select * from test")
             assert(sql("select * from test").count() == 5)
 
+            def deleteRandomFile(): Unit = {
+              val p = new Path(spark.table("test").inputFiles.head)
+              
assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, 
true))
+            }
+
             // Delete a file, then assert that we tried to read it. This means 
the table was cached.
-            val p = new Path(spark.table("test").inputFiles.head)
-            
assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, 
true))
+            deleteRandomFile()
             val e = intercept[SparkException] {
               sql("select * from test").count()
             }
@@ -91,6 +95,19 @@ class HiveMetadataCacheSuite extends QueryTest with 
SQLTestUtils with TestHiveSi
             // Test refreshing the cache.
             spark.catalog.refreshTable("test")
             assert(sql("select * from test").count() == 4)
+            assert(spark.table("test").inputFiles.length == 4)
+
+            // Test refresh by path separately since it goes through different 
code paths than
+            // refreshTable does.
+            deleteRandomFile()
+            spark.catalog.cacheTable("test")
+            spark.catalog.refreshByPath("/some-invalid-path")  // no-op
+            val e2 = intercept[SparkException] {
+              sql("select * from test").count()
+            }
+            assert(e2.getMessage.contains("FileNotFoundException"))
+            spark.catalog.refreshByPath(dir.getAbsolutePath)
+            assert(sql("select * from test").count() == 3)
           }
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to