Repository: spark
Updated Branches:
  refs/heads/master 871f6114a -> 35e974076


[SPARK-19028][SQL] Fixed non-thread-safe functions used in SessionCatalog

### What changes were proposed in this pull request?
Fixed non-thread-safe functions used in SessionCatalog:
- refreshTable
- lookupRelation

### How was this patch tested?
N/A

Author: gatorsmile <gatorsm...@gmail.com>

Closes #16437 from gatorsmile/addSyncToLookUpTable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/35e97407
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/35e97407
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/35e97407

Branch: refs/heads/master
Commit: 35e974076dcbc5afde8d4259ce88cb5f29d94920
Parents: 871f611
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Sat Dec 31 19:40:28 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Sat Dec 31 19:40:28 2016 +0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   |  2 +-
 .../spark/sql/hive/HiveSessionCatalog.scala     | 36 +++++++++++---------
 2 files changed, 20 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/35e97407/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index e996a83..741ed05 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -634,7 +634,7 @@ class SessionCatalog(
   /**
    * Refresh the cache entry for a metastore table, if any.
    */
-  def refreshTable(name: TableIdentifier): Unit = {
+  def refreshTable(name: TableIdentifier): Unit = synchronized {
     // Go through temporary tables and invalidate them.
     // If the database is defined, this is definitely not a temp table.
     // If the database is not defined, there is a good chance this is a temp 
table.

http://git-wip-us.apache.org/repos/asf/spark/blob/35e97407/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 08bf1cd..462b3c2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -56,23 +56,25 @@ private[sql] class HiveSessionCatalog(
     hadoopConf) {
 
   override def lookupRelation(name: TableIdentifier, alias: Option[String]): 
LogicalPlan = {
-    val table = formatTableName(name.table)
-    val db = formatDatabaseName(name.database.getOrElse(currentDb))
-    if (db == globalTempViewManager.database) {
-      val relationAlias = alias.getOrElse(table)
-      globalTempViewManager.get(table).map { viewDef =>
-        SubqueryAlias(relationAlias, viewDef, Some(name))
-      }.getOrElse(throw new NoSuchTableException(db, table))
-    } else if (name.database.isDefined || !tempTables.contains(table)) {
-      val database = name.database.map(formatDatabaseName)
-      val newName = name.copy(database = database, table = table)
-      metastoreCatalog.lookupRelation(newName, alias)
-    } else {
-      val relation = tempTables(table)
-      val tableWithQualifiers = SubqueryAlias(table, relation, None)
-      // If an alias was specified by the lookup, wrap the plan in a subquery 
so that
-      // attributes are properly qualified with this alias.
-      alias.map(a => SubqueryAlias(a, tableWithQualifiers, 
None)).getOrElse(tableWithQualifiers)
+    synchronized {
+      val table = formatTableName(name.table)
+      val db = formatDatabaseName(name.database.getOrElse(currentDb))
+      if (db == globalTempViewManager.database) {
+        val relationAlias = alias.getOrElse(table)
+        globalTempViewManager.get(table).map { viewDef =>
+          SubqueryAlias(relationAlias, viewDef, Some(name))
+        }.getOrElse(throw new NoSuchTableException(db, table))
+      } else if (name.database.isDefined || !tempTables.contains(table)) {
+        val database = name.database.map(formatDatabaseName)
+        val newName = name.copy(database = database, table = table)
+        metastoreCatalog.lookupRelation(newName, alias)
+      } else {
+        val relation = tempTables(table)
+        val tableWithQualifiers = SubqueryAlias(table, relation, None)
+        // If an alias was specified by the lookup, wrap the plan in a 
subquery so that
+        // attributes are properly qualified with this alias.
+        alias.map(a => SubqueryAlias(a, tableWithQualifiers, 
None)).getOrElse(tableWithQualifiers)
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to