Repository: spark
Updated Branches:
  refs/heads/master 440ea31b7 -> c42c3fc7f


[SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata

https://issues.apache.org/jira/browse/SPARK-6575

Author: Yin Huai <yh...@databricks.com>

This patch had conflicts when merged, resolved by
Committer: Cheng Lian <l...@databricks.com>

Closes #5339 from yhuai/parquetRelationCache and squashes the following commits:

b0e1a42 [Yin Huai] Address comments.
83d9846 [Yin Huai] Remove unnecessary change.
c0dc7a4 [Yin Huai] Cache converted parquet relations.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c42c3fc7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c42c3fc7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c42c3fc7

Branch: refs/heads/master
Commit: c42c3fc7f7b79a1f6ce990d39b5d9d14ab19fcf0
Parents: 440ea31
Author: Yin Huai <yh...@databricks.com>
Authored: Fri Apr 3 14:40:36 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Fri Apr 3 14:40:36 2015 +0800

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 28 ++++++++++++++------
 .../spark/sql/hive/execution/commands.scala     |  5 ++--
 .../apache/spark/sql/hive/parquetSuites.scala   |  2 --
 3 files changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c42c3fc7/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 76d329a..c4da34a 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -116,8 +116,14 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
   }
 
   override def refreshTable(databaseName: String, tableName: String): Unit = {
-    // refresh table does not eagerly reload the cache. It just invalidate the 
cache.
+    // refreshTable does not eagerly reload the cache. It just invalidate the 
cache.
     // Next time when we use the table, it will be populated in the cache.
+    // Since we also cache ParquetRealtions converted from Hive Parquet tables 
and
+    // adding converted ParquetRealtions into the cache is not defined in the 
load function
+    // of the cache (instead, we add the cache entry in 
convertToParquetRelation),
+    // it is better at here to invalidate the cache to avoid confusing waring 
logs from the
+    // cache loader (e.g. cannot find data source provider, which is only 
defined for
+    // data source table.).
     invalidateTable(databaseName, tableName)
   }
 
@@ -242,21 +248,27 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
       QualifiedTableName(metastoreRelation.databaseName, 
metastoreRelation.tableName)
 
     def getCached(
-      tableIdentifier: QualifiedTableName,
-      pathsInMetastore: Seq[String],
-      schemaInMetastore: StructType,
-      partitionSpecInMetastore: Option[PartitionSpec]): 
Option[LogicalRelation] = {
+        tableIdentifier: QualifiedTableName,
+        pathsInMetastore: Seq[String],
+        schemaInMetastore: StructType,
+        partitionSpecInMetastore: Option[PartitionSpec]): 
Option[LogicalRelation] = {
       cachedDataSourceTables.getIfPresent(tableIdentifier) match {
         case null => None // Cache miss
-        case logical @ LogicalRelation(parquetRelation: ParquetRelation2) =>
+        case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
           // If we have the same paths, same schema, and same partition spec,
           // we will use the cached Parquet Relation.
           val useCached =
-            parquetRelation.paths == pathsInMetastore &&
+            parquetRelation.paths.toSet == pathsInMetastore.toSet &&
             logical.schema.sameType(metastoreSchema) &&
             parquetRelation.maybePartitionSpec == partitionSpecInMetastore
 
-          if (useCached) Some(logical) else None
+          if (useCached) {
+            Some(logical)
+          } else {
+            // If the cached relation is not updated, we invalidate it right 
away.
+            cachedDataSourceTables.invalidate(tableIdentifier)
+            None
+          }
         case other =>
           logWarning(
             s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} 
shold be stored " +

http://git-wip-us.apache.org/repos/asf/spark/blob/c42c3fc7/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 4345ffb..99dc586 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -58,12 +58,13 @@ case class DropTable(
     try {
       hiveContext.cacheManager.tryUncacheQuery(hiveContext.table(tableName))
     } catch {
-      // This table's metadata is not in
+      // This table's metadata is not in Hive metastore (e.g. the table does 
not exist).
       case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException =>
+      case _: org.apache.spark.sql.catalyst.analysis.NoSuchTableException =>
       // Other Throwables can be caused by users providing wrong parameters in 
OPTIONS
       // (e.g. invalid paths). We catch it and log a warning message.
       // Users should be able to drop such kinds of tables regardless if there 
is an error.
-      case e: Throwable => log.warn(s"${e.getMessage}")
+      case e: Throwable => log.warn(s"${e.getMessage}", e)
     }
     hiveContext.invalidateTable(tableName)
     hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")

http://git-wip-us.apache.org/repos/asf/spark/blob/c42c3fc7/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 2ad6e86..1319c81 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -473,7 +473,6 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
     // Right now, insert into a partitioned Parquet is not supported in data 
source Parquet.
     // So, we expect it is not cached.
     assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === 
null)
-    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
     sql(
       """
         |INSERT INTO TABLE test_parquet_partitioned_cache_test
@@ -481,7 +480,6 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
         |select a, b from jt
       """.stripMargin)
     assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === 
null)
-    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
 
     // Make sure we can cache the partitioned table.
     table("test_parquet_partitioned_cache_test")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to