IMPALA-4449: Revisit table locking pattern in the catalog

This commit fixes an issue where multiple long-running operations on the
same catalog object (e.g. table) can block other catalog operations from
making progress.

Problem:
IMPALA-1480 introduced table level locking that in conjunction with the
global catalog lock ensures serialized access to catalog table objects.
In some cases (e.g. multiple long running operations on same table), the
locking pattern used resulted in the catalog lock being held for
a long period of time, thus blocking other catalog operations from making
any progress. That resulted in high response times and the system
appearing to be hung.

Solution:
Change the locking pattern in the catalog for protecting table objects
so that no operation will hold the catalog lock for a long time if it
fails to acquire a table lock. The operation that attempts to acquire a
table lock and fails to do so must release the catalog lock and retry.
The use of fair locks prevent starvation from happening. The only
operation that doesn't follow this retry logic is the
getCatalogObjects() call that retrieves a snapshot of the catalog
metadata for transmitting to the statestore.

Testing:
I manually tested this change by running concurrency tests using JMeter
and verified that the throughput of catalog operations on a specific table
is not affected by other concurrent long running operations (e.g. refresh)
on a different table.

Change-Id: Id08e21da31deb1f003b3cada4517651f3b3b2bb2
Reviewed-on: http://gerrit.cloudera.org:8080/5710
Reviewed-by: Dimitris Tsirogiannis <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3426a049
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3426a049
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3426a049

Branch: refs/heads/master
Commit: 3426a04952eb5ee906b732b4644ce77c0d24fa7e
Parents: fe2be25
Author: Dimitris Tsirogiannis <[email protected]>
Authored: Fri Jan 13 09:55:56 2017 -0800
Committer: Impala Public Jenkins <[email protected]>
Committed: Fri Jan 20 21:13:37 2017 +0000

----------------------------------------------------------------------
 .../impala/catalog/CatalogServiceCatalog.java   |  66 ++++++-
 .../java/org/apache/impala/catalog/Table.java   |   4 +
 .../impala/service/CatalogOpExecutor.java       | 186 ++++++++++++-------
 3 files changed, 183 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3426a049/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 31001a2..2c42874 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -180,6 +180,42 @@ public class CatalogServiceCatalog extends Catalog {
     localLibraryPath_ = new String("file://" + localLibraryPath);
   }
 
+  // Timeout for acquiring a table lock
+  // TODO: Make this configurable
+  private static final long TBL_LOCK_TIMEOUT_MS = 7200000;
+  // Time to sleep before retrying to acquire a table lock
+  private static final int TBL_LOCK_RETRY_MS = 10;
+
+  /**
+   * Tries to acquire catalogLock_ and the lock of 'tbl' in that order. 
Returns true if it
+   * successfully acquires both within TBL_LOCK_TIMEOUT_MS millisecs; both 
locks are held
+   * when the function returns. Returns false otherwise and no lock is held in 
this case.
+   */
+  public boolean tryLockTable(Table tbl) {
+    long begin = System.currentTimeMillis();
+    long end;
+    do {
+      catalogLock_.writeLock().lock();
+      if (tbl.getLock().tryLock()) {
+        if (LOG.isTraceEnabled()) {
+          end = System.currentTimeMillis();
+          LOG.trace(String.format("Lock for table %s was acquired in %d msec",
+              tbl.getFullName(), end - begin));
+        }
+        return true;
+      }
+      catalogLock_.writeLock().unlock();
+      try {
+        // Sleep to avoid spinning and allow other operations to make progress.
+        Thread.sleep(TBL_LOCK_RETRY_MS);
+      } catch (InterruptedException e) {
+        // ignore
+      }
+      end = System.currentTimeMillis();
+    } while (end - begin < TBL_LOCK_TIMEOUT_MS);
+    return false;
+  }
+
   /**
    * Reads the current set of cache pools from HDFS and updates the catalog.
    * Called periodically by the cachePoolReader_.
@@ -290,7 +326,8 @@ public class CatalogServiceCatalog extends Catalog {
           }
 
           // Protect the table from concurrent modifications.
-          synchronized(tbl) {
+          tbl.getLock().lock();
+          try {
             // Only add the extended metadata if this table's version is >=
             // the fromVersion.
             if (tbl.getCatalogVersion() >= fromVersion) {
@@ -307,6 +344,8 @@ public class CatalogServiceCatalog extends Catalog {
             } else {
               catalogTbl.setTable(new TTable(db.getName(), tblName));
             }
+          } finally {
+            tbl.getLock().unlock();
           }
           resp.addToObjects(catalogTbl);
         }
@@ -919,8 +958,11 @@ public class CatalogServiceCatalog extends Catalog {
       }
     }
 
-    catalogLock_.writeLock().lock();
-    synchronized(tbl) {
+    if (!tryLockTable(tbl)) {
+      throw new CatalogException(String.format("Error refreshing metadata for 
table " +
+          "%s due to lock contention", tbl.getFullName()));
+    }
+    try {
       long newCatalogVersion = incrementAndGetCatalogVersion();
       catalogLock_.writeLock().unlock();
       try (MetaStoreClient msClient = getMetaStoreClient()) {
@@ -937,6 +979,9 @@ public class CatalogServiceCatalog extends Catalog {
       tbl.setCatalogVersion(newCatalogVersion);
       LOG.info(String.format("Refreshed table metadata: %s", 
tbl.getFullName()));
       return tbl;
+    } finally {
+      Preconditions.checkState(!catalogLock_.isWriteLockedByCurrentThread());
+      tbl.getLock().unlock();
     }
   }
 
@@ -958,7 +1003,7 @@ public class CatalogServiceCatalog extends Catalog {
       throws CatalogException {
     Preconditions.checkNotNull(tbl);
     Preconditions.checkNotNull(partitionSet);
-    Preconditions.checkState(Thread.holdsLock(tbl));
+    Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread());
     if (!(tbl instanceof HdfsTable)) {
       throw new CatalogException("Table " + tbl.getFullName() + " is not an 
Hdfs table");
     }
@@ -979,7 +1024,7 @@ public class CatalogServiceCatalog extends Catalog {
       throws CatalogException {
     Preconditions.checkNotNull(tbl);
     Preconditions.checkNotNull(partitionSpec);
-    Preconditions.checkState(Thread.holdsLock(tbl));
+    Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread());
     if (!(tbl instanceof HdfsTable)) {
       throw new CatalogException("Table " + tbl.getFullName() + " is not an 
Hdfs table");
     }
@@ -994,7 +1039,6 @@ public class CatalogServiceCatalog extends Catalog {
   public Table addPartition(HdfsPartition partition) throws CatalogException {
     Preconditions.checkNotNull(partition);
     HdfsTable hdfsTable = partition.getTable();
-    Db db = getDb(hdfsTable.getDb().getName());
     hdfsTable.addPartition(partition);
     return hdfsTable;
   }
@@ -1251,8 +1295,11 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public Table reloadPartition(Table tbl, List<TPartitionKeyValue> 
partitionSpec)
       throws CatalogException {
-    catalogLock_.writeLock().lock();
-    synchronized (tbl) {
+    if (!tryLockTable(tbl)) {
+      throw new CatalogException(String.format("Error reloading partition of 
table %s " +
+          "due to lock contention", tbl.getFullName()));
+    }
+    try {
       long newCatalogVersion = incrementAndGetCatalogVersion();
       catalogLock_.writeLock().unlock();
       HdfsTable hdfsTable = (HdfsTable) tbl;
@@ -1288,6 +1335,9 @@ public class CatalogServiceCatalog extends Catalog {
       LOG.info(String.format("Refreshed partition metadata: %s %s",
           hdfsTable.getFullName(), partitionName));
       return hdfsTable;
+    } finally {
+      Preconditions.checkState(!catalogLock_.isWriteLockedByCurrentThread());
+      tbl.getLock().unlock();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3426a049/fe/src/main/java/org/apache/impala/catalog/Table.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java 
b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 9919dbf..01a4e55 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.catalog;
 
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -67,6 +68,8 @@ public abstract class Table implements CatalogObject {
   protected final String owner_;
   protected TTableDescriptor tableDesc_;
   protected TAccessLevel accessLevel_ = TAccessLevel.READ_WRITE;
+  // Lock protecting this table
+  private final ReentrantLock tableLock_ = new ReentrantLock(true);
 
   // Number of clustering columns.
   protected int numClusteringCols_;
@@ -101,6 +104,7 @@ public abstract class Table implements CatalogObject {
         CatalogServiceCatalog.getLastDdlTime(msTable_) : -1;
   }
 
+  public ReentrantLock getLock() { return tableLock_; }
   public abstract TTableDescriptor toThriftDescriptor(
       int tableId, Set<Long> referencedPartitions);
   public abstract TCatalogObjectType getCatalogObjectType();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3426a049/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index fce6e07..3c1dad8 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -169,13 +169,31 @@ import com.google.common.collect.Sets;
  *
  * The following locking protocol is employed to ensure that modifying
  * the table metadata and assigning a new catalog version is performed 
atomically and
- * consistently in the presence of concurrent DDL operations:
- * 1. Acquire the catalog lock (see CatalogServiceCatalog.catalogLock_)
- * 2. Acquire a table lock
- * 3. Increment and get a new catalog version
- * 4. Release the catalog lock
- * 5. Modify table metadata
- * 6. Release table lock
+ * consistently in the presence of concurrent DDL operations. The following 
pattern
+ * ensures that the catalog lock is never held for a long period of time, 
preventing
+ * other DDL operations from making progress. This pattern only applies to 
single-table
+ * update operations and requires the use of fair table locks to prevent 
starvation.
+ *
+ *   DO {
+ *     Acquire the catalog lock (see CatalogServiceCatalog.catalogLock_)
+ *     Try to acquire a table lock
+ *     IF the table lock acquisition fails {
+ *       Release the catalog lock
+ *       YIELD()
+ *     ELSE
+ *       BREAK
+ *   } WHILE (TIMEOUT);
+ *
+ *   If (TIMEOUT) report error
+ *
+ *   Increment and get a new catalog version
+ *   Release the catalog lock
+ *   Modify table metadata
+ *   Release table lock
+ *
+ * Note: The getCatalogObjects() function is the only case where this locking 
pattern is
+ * not used since it accesses multiple catalog entities in order to compute a 
snapshot
+ * of catalog metadata.
  *
  * Operations that CREATE/DROP catalog objects such as tables and databases 
employ the
  * following locking protocol:
@@ -341,8 +359,12 @@ public class CatalogOpExecutor {
 
     TableName tableName = TableName.fromThrift(params.getTable_name());
     Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl());
-    catalog_.getLock().writeLock().lock();
-    synchronized (tbl) {
+
+    if (!catalog_.tryLockTable(tbl)) {
+      throw new InternalException(String.format("Error altering table %s due 
to lock " +
+          "contention.", tbl.getFullName()));
+    }
+    try {
       if (params.getAlter_type() == TAlterTableType.RENAME_VIEW
           || params.getAlter_type() == TAlterTableType.RENAME_TABLE) {
         // RENAME is implemented as an ADD + DROP, so we need to execute it as 
we hold
@@ -376,10 +398,10 @@ public class CatalogOpExecutor {
         case ADD_PARTITION:
           TAlterTableAddPartitionParams addPartParams =
               params.getAdd_partition_params();
-          // Create and add HdfsPartition object to the corresponding 
HdfsTable and load
-          // its block metadata. Get the new table object with an updated 
catalog
-          // version. If the partition already exists in Hive and 
"IfNotExists" is true,
-          // then return without populating the response object.
+          // Create and add HdfsPartition object to the corresponding 
HdfsTable and
+          // load its block metadata. Get the new table object with an updated 
catalog
+          // version. If the partition already exists in Hive and 
"IfNotExists" is
+          // true, then return without populating the response object.
           Table refreshedTable = alterTableAddPartition(tbl,
               addPartParams.getPartition_spec(), 
addPartParams.isIf_not_exists(),
               addPartParams.getLocation(), addPartParams.getCache_op());
@@ -429,8 +451,8 @@ public class CatalogOpExecutor {
           TAlterTableSetFileFormatParams fileFormatParams =
               params.getSet_file_format_params();
           reloadFileMetadata = alterTableSetFileFormat(
-              tbl, fileFormatParams.getPartition_set(), 
fileFormatParams.getFile_format(),
-              numUpdatedPartitions);
+              tbl, fileFormatParams.getPartition_set(),
+              fileFormatParams.getFile_format(), numUpdatedPartitions);
 
           if (fileFormatParams.isSetPartition_set()) {
             resultColVal.setString_val(
@@ -472,7 +494,8 @@ public class CatalogOpExecutor {
           String op = 
params.getSet_cached_params().getCache_op().isSet_cached() ?
               "Cached " : "Uncached ";
           if (params.getSet_cached_params().getPartition_set() == null) {
-            reloadFileMetadata = alterTableSetCached(tbl, 
params.getSet_cached_params());
+            reloadFileMetadata =
+                alterTableSetCached(tbl, params.getSet_cached_params());
             resultColVal.setString_val(op + "table.");
           } else {
             alterPartitionSetCached(tbl, params.getSet_cached_params(),
@@ -491,8 +514,8 @@ public class CatalogOpExecutor {
       }
 
       if (reloadMetadata) {
-        loadTableMetadata(tbl, newCatalogVersion, reloadFileMetadata, 
reloadTableSchema,
-            null);
+        loadTableMetadata(tbl, newCatalogVersion, reloadFileMetadata,
+            reloadTableSchema, null);
         addTableToCatalogUpdate(tbl, response.result);
       }
 
@@ -505,7 +528,10 @@ public class CatalogOpExecutor {
         resultSet.setRows(Lists.newArrayList(resultRow));
         response.setResult_set(resultSet);
       }
-    } // end of synchronized block
+    } finally {
+      
Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread());
+      tbl.getLock().unlock();
+    }
   }
 
   /**
@@ -524,7 +550,7 @@ public class CatalogOpExecutor {
    */
   private void alterKuduTable(TAlterTableParams params, TDdlExecResponse 
response,
       KuduTable tbl, long newCatalogVersion) throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
+    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
     switch (params.getAlter_type()) {
       case ADD_REPLACE_COLUMNS:
         TAlterTableAddReplaceColsParams addReplaceColParams =
@@ -566,7 +592,7 @@ public class CatalogOpExecutor {
   private void loadTableMetadata(Table tbl, long newCatalogVersion,
       boolean reloadFileMetadata, boolean reloadTableSchema,
       Set<String> partitionsToUpdate) throws CatalogException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
+    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       org.apache.hadoop.hive.metastore.api.Table msTbl =
           getMetaStoreTable(msClient, tbl);
@@ -622,15 +648,20 @@ public class CatalogOpExecutor {
           "Null or empty column list given as argument to 
DdlExecutor.alterView");
     Table tbl = catalog_.getTable(tableName.getDb(), tableName.getTbl());
     Preconditions.checkState(tbl instanceof View);
-    catalog_.getLock().writeLock().lock();
-    synchronized(tbl) {
+
+    if (!catalog_.tryLockTable(tbl)) {
+      throw new InternalException(String.format("Error altering view %s due to 
lock " +
+          "contention", tbl.getFullName()));
+    }
+    try {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
       // Operate on a copy of the metastore table to avoid prematurely 
applying the
       // alteration to our cached table in case the actual alteration fails.
       org.apache.hadoop.hive.metastore.api.Table msTbl =
           tbl.getMetaStoreTable().deepCopy();
-      if 
(!msTbl.getTableType().equalsIgnoreCase((TableType.VIRTUAL_VIEW.toString()))) {
+      if (!msTbl.getTableType().equalsIgnoreCase(
+          (TableType.VIRTUAL_VIEW.toString()))) {
         throw new ImpalaRuntimeException(
             String.format("ALTER VIEW not allowed on a table: %s",
                 tableName.toString()));
@@ -647,6 +678,9 @@ public class CatalogOpExecutor {
       }
       tbl.setCatalogVersion(newCatalogVersion);
       addTableToCatalogUpdate(tbl, resp.result);
+    } finally {
+      
Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread());
+      tbl.getLock().unlock();
     }
   }
 
@@ -657,7 +691,7 @@ public class CatalogOpExecutor {
   private void alterTableUpdateStats(Table table, TAlterTableUpdateStatsParams 
params,
       TDdlExecResponse resp, Reference<Long> numUpdatedPartitions,
       Reference<Long> numUpdatedColumns) throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(table));
+    Preconditions.checkState(table.getLock().isHeldByCurrentThread());
     if (params.isSetTable_stats()) {
       // Updating table and column stats via COMPUTE STATS.
       Preconditions.checkState(
@@ -1086,8 +1120,11 @@ public class CatalogOpExecutor {
     Table table = getExistingTable(params.getTable_name().getDb_name(),
         params.getTable_name().getTable_name());
     Preconditions.checkNotNull(table);
-    catalog_.getLock().writeLock().lock();
-    synchronized(table) {
+    if (!catalog_.tryLockTable(table)) {
+      throw new InternalException(String.format("Error dropping stats for 
table %s " +
+          "due to lock contention", table.getFullName()));
+    }
+    try {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
       if (params.getPartition_set() == null) {
@@ -1096,8 +1133,9 @@ public class CatalogOpExecutor {
         dropColumnStats(table);
         dropTableStats(table);
       } else {
+        HdfsTable hdfsTbl = (HdfsTable) table;
         List<HdfsPartition> partitions =
-            
((HdfsTable)table).getPartitionsFromPartitionSet(params.getPartition_set());
+            hdfsTbl.getPartitionsFromPartitionSet(params.getPartition_set());
         if (partitions.isEmpty()) return;
 
         for(HdfsPartition partition : partitions) {
@@ -1113,7 +1151,10 @@ public class CatalogOpExecutor {
       }
       loadTableMetadata(table, newCatalogVersion, false, true, null);
       addTableToCatalogUpdate(table, resp.result);
-    } // end of synchronization
+    } finally {
+      
Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread());
+      table.getLock().unlock();
+    }
   }
 
   /**
@@ -1121,7 +1162,7 @@ public class CatalogOpExecutor {
    * that were updated as part of this operation.
    */
   private int dropColumnStats(Table table) throws ImpalaRuntimeException {
-    Preconditions.checkState(Thread.holdsLock(table));
+    Preconditions.checkState(table.getLock().isHeldByCurrentThread());
     int numColsUpdated = 0;
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       for (Column col: table.getColumns()) {
@@ -1153,7 +1194,7 @@ public class CatalogOpExecutor {
    * is unpartitioned.
    */
   private int dropTableStats(Table table) throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(table));
+    Preconditions.checkState(table.getLock().isHeldByCurrentThread());
     // Delete the ROW_COUNT from the table (if it was set).
     org.apache.hadoop.hive.metastore.api.Table msTbl = 
table.getMetaStoreTable();
     int numTargetedPartitions = 0;
@@ -1406,8 +1447,11 @@ public class CatalogOpExecutor {
           String.format("TRUNCATE TABLE not supported on non-HDFS table: %s",
           table.getFullName()));
     }
-    catalog_.getLock().writeLock().lock();
-    synchronized(table) {
+    if (!catalog_.tryLockTable(table)) {
+      throw new InternalException(String.format("Error truncating table %s due 
to lock " +
+          "contention", table.getFullName()));
+    }
+    try {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
       try {
@@ -1427,7 +1471,10 @@ public class CatalogOpExecutor {
 
       loadTableMetadata(table, newCatalogVersion, true, true, null);
       addTableToCatalogUpdate(table, resp.result);
-    } // end synchronization
+    } finally {
+      
Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread());
+      table.getLock().unlock();
+    }
   }
 
   private void dropFunction(TDropFunctionParams params, TDdlExecResponse resp)
@@ -1801,7 +1848,7 @@ public class CatalogOpExecutor {
    */
   private void alterTableAddReplaceCols(Table tbl, List<TColumn> columns,
       boolean replaceExistingCols) throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
+    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
     org.apache.hadoop.hive.metastore.api.Table msTbl = 
tbl.getMetaStoreTable().deepCopy();
     List<FieldSchema> newColumns = buildFieldSchemaList(columns);
     if (replaceExistingCols) {
@@ -1821,7 +1868,7 @@ public class CatalogOpExecutor {
    */
   private void alterTableChangeCol(Table tbl, String colName,
       TColumn newCol) throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
+    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
     org.apache.hadoop.hive.metastore.api.Table msTbl = 
tbl.getMetaStoreTable().deepCopy();
     // Find the matching column name and change it.
     Iterator<FieldSchema> iterator = msTbl.getSd().getColsIterator();
@@ -1857,7 +1904,7 @@ public class CatalogOpExecutor {
   private Table alterTableAddPartition(Table tbl, List<TPartitionKeyValue> 
partitionSpec,
       boolean ifNotExists, String location, THdfsCachingOp cacheOp)
       throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
+    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
     TableName tableName = tbl.getTableName();
     if (ifNotExists && catalog_.containsHdfsPartition(tableName.getDb(),
         tableName.getTbl(), partitionSpec)) {
@@ -1942,7 +1989,7 @@ public class CatalogOpExecutor {
       List<List<TPartitionKeyValue>> partitionSet,
       boolean ifExists, boolean purge, Reference<Long> numUpdatedPartitions)
       throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
+    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
     Preconditions.checkNotNull(partitionSet);
 
     TableName tableName = tbl.getTableName();
@@ -2003,7 +2050,7 @@ public class CatalogOpExecutor {
    * Removes a column from the given table.
    */
   private void alterTableDropCol(Table tbl, String colName) throws 
ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
+    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
     org.apache.hadoop.hive.metastore.api.Table msTbl = 
tbl.getMetaStoreTable().deepCopy();
     // Find the matching column name and remove it.
     Iterator<FieldSchema> iterator = msTbl.getSd().getColsIterator();
@@ -2029,7 +2076,7 @@ public class CatalogOpExecutor {
    */
   private void alterTableOrViewRename(Table oldTbl, TableName newTableName,
       TDdlExecResponse response) throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(oldTbl)
+    Preconditions.checkState(oldTbl.getLock().isHeldByCurrentThread()
         && catalog_.getLock().isWriteLockedByCurrentThread());
     TableName tableName = oldTbl.getTableName();
     org.apache.hadoop.hive.metastore.api.Table msTbl =
@@ -2106,7 +2153,7 @@ public class CatalogOpExecutor {
       List<List<TPartitionKeyValue>> partitionSet, THdfsFileFormat fileFormat,
       Reference<Long> numUpdatedPartitions)
       throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
+    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
     Preconditions.checkState(partitionSet == null || !partitionSet.isEmpty());
     boolean reloadFileMetadata = false;
     if (partitionSet == null) {
@@ -2152,7 +2199,7 @@ public class CatalogOpExecutor {
    */
   private boolean alterTableSetLocation(Table tbl,
       List<TPartitionKeyValue> partitionSpec, String location) throws 
ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
+    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
     boolean reloadFileMetadata = false;
     if (partitionSpec == null) {
       org.apache.hadoop.hive.metastore.api.Table msTbl =
@@ -2181,7 +2228,7 @@ public class CatalogOpExecutor {
   private void alterTableSetTblProperties(Table tbl,
       TAlterTableSetTblPropertiesParams params, Reference<Long> 
numUpdatedPartitions)
       throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
+    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
     Map<String, String> properties = params.getProperties();
     Preconditions.checkNotNull(properties);
     if (params.isSetPartition_set()) {
@@ -2260,7 +2307,7 @@ public class CatalogOpExecutor {
    */
   private boolean alterTableSetCached(Table tbl, TAlterTableSetCachedParams 
params)
       throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
+    Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread());
     THdfsCachingOp cacheOp = params.getCache_op();
     Preconditions.checkNotNull(cacheOp);
     // Alter table params.
@@ -2385,7 +2432,7 @@ public class CatalogOpExecutor {
   private void alterPartitionSetCached(Table tbl,
       TAlterTableSetCachedParams params, Reference<Long> numUpdatedPartitions)
       throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
+    Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread());
     THdfsCachingOp cacheOp = params.getCache_op();
     Preconditions.checkNotNull(cacheOp);
     Preconditions.checkNotNull(params.getPartition_set());
@@ -2445,7 +2492,7 @@ public class CatalogOpExecutor {
    * Add partitions to metastore which exist in HDFS but not in metastore.
    */
   private void alterTableRecoverPartitions(Table tbl) throws ImpalaException {
-    Preconditions.checkState(Thread.holdsLock(tbl));
+    Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread());
     if (!(tbl instanceof HdfsTable)) {
       throw new CatalogException("Table " + tbl.getFullName() + " is not an 
HDFS table");
     }
@@ -2995,14 +3042,16 @@ public class CatalogOpExecutor {
           update.getTarget_table());
     }
 
-    catalog_.getLock().writeLock().lock();
-    synchronized (table) {
+    if (!catalog_.tryLockTable(table)) {
+      throw new InternalException("Error updating the catalog due to lock 
contention.");
+    }
+    try {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
       // Collects the cache directive IDs of any cached table/partitions that 
were
-      // targeted. A watch on these cache directives is submitted to the 
TableLoadingMgr
-      // and the table will be refreshed asynchronously after all cache 
directives
-      // complete.
+      // targeted. A watch on these cache directives is submitted to the
+      // TableLoadingMgr and the table will be refreshed asynchronously after 
all
+      // cache directives complete.
       List<Long> cacheDirIds = Lists.<Long>newArrayList();
 
       // If the table is cached, get its cache pool name and replication 
factor. New
@@ -3020,24 +3069,27 @@ public class CatalogOpExecutor {
         // In the BE, we don't currently distinguish between which targeted 
partitions
         // are new and which already exist, so initialize the set with all 
targeted
         // partition names and remove the ones that are found to exist.
-        HashSet<String> partsToCreate = 
Sets.newHashSet(update.getCreated_partitions());
+        HashSet<String> partsToCreate =
+            Sets.newHashSet(update.getCreated_partitions());
         partsToLoadMetadata = Sets.newHashSet(partsToCreate);
         for (HdfsPartition partition: ((HdfsTable) table).getPartitions()) {
           // Skip dummy default partition.
-          if (partition.getId() == 
ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID) {
+          long partitionId = partition.getId();
+          if (partitionId == 
ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID) {
             continue;
           }
-          // TODO: In the BE we build partition names without a trailing char. 
In FE we
-          // build partition name with a trailing char. We should make this 
consistent.
+          // TODO: In the BE we build partition names without a trailing char. 
In FE
+          // we build partition name with a trailing char. We should make this
+          // consistent.
           String partName = partition.getPartitionName() + "/";
 
           // Attempt to remove this partition name from from partsToCreate. If 
remove
           // returns true, it indicates the partition already exists.
           if (partsToCreate.remove(partName) && partition.isMarkedCached()) {
-            // The partition was targeted by the insert and is also a cached. 
Since data
-            // was written to the partition, a watch needs to be placed on the 
cache
-            // cache directive so the TableLoadingMgr can perform an async 
refresh once
-            // all data becomes cached.
+            // The partition was targeted by the insert and is also a cached. 
Since
+            // data was written to the partition, a watch needs to be placed 
on the
+            // cache cache directive so the TableLoadingMgr can perform an 
async
+            // refresh once all data becomes cached.
             cacheDirIds.add(HdfsCachingUtil.getCacheDirectiveId(
                 partition.getParameters()));
           }
@@ -3099,7 +3151,8 @@ public class CatalogOpExecutor {
                       tblName.getTbl(), cachedHmsParts);
                 } catch (Exception e) {
                   LOG.error("Failed in alter_partitions: ", e);
-                  // Try to uncache the partitions when the alteration in the 
HMS failed.
+                  // Try to uncache the partitions when the alteration in the 
HMS
+                  // failed.
                   for (org.apache.hadoop.hive.metastore.api.Partition part:
                       cachedHmsParts) {
                     try {
@@ -3107,9 +3160,9 @@ public class CatalogOpExecutor {
                     } catch (ImpalaException e1) {
                       String msg = String.format(
                           "Partition %s.%s(%s): State: Leaked caching 
directive. " +
-                          "Action: Manually uncache directory %s via hdfs 
cacheAdmin.",
-                          part.getDbName(), part.getTableName(), 
part.getValues(),
-                          part.getSd().getLocation());
+                          "Action: Manually uncache directory %s via hdfs " +
+                          "cacheAdmin.", part.getDbName(), part.getTableName(),
+                          part.getValues(), part.getSd().getLocation());
                       LOG.error(msg, e);
                       errorMessages.add(msg);
                     }
@@ -3146,8 +3199,11 @@ public class CatalogOpExecutor {
 
       loadTableMetadata(table, newCatalogVersion, true, false, 
partsToLoadMetadata);
       addTableToCatalogUpdate(table, response.result);
-    } // end of synchronized block
-    return response;
+      return response;
+    } finally {
+      
Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread());
+      table.getLock().unlock();
+    }
   }
 
   private List<String> 
getPartValsFromName(org.apache.hadoop.hive.metastore.api.Table

Reply via email to