IMPALA-4449: Revisit table locking pattern in the catalog This commit fixes an issue where multiple long-running operations on the same catalog object (e.g. table) can block other catalog operations from making progress.
Problem: IMPALA-1480 introduced table level locking that in conjunction with the global catalog lock ensures serialized access to catalog table objects. In some cases (e.g. multiple long running operations on same table), the locking pattern used resulted in the catalog lock being held for a long period of time, thus blocking other catalog operations from making any progress. That resulted in high response times and the system appearing to be hung. Solution: Change the locking pattern in the catalog for protecting table objects so that no operation will hold the catalog lock for a long time if it fails to acquire a table lock. The operation that attempts to acquire a table lock and fails to do so must release the catalog lock and retry. The use of fair locks prevent starvation from happening. The only operation that doesn't follow this retry logic is the getCatalogObjects() call that retrieves a snapshot of the catalog metadata for transmitting to the statestore. Testing: I manually tested this change by running concurrency tests using JMeter and verified that the throughput of catalog operations on a specific table is not affected by other concurrent long running operations (e.g. refresh) on a different table. Change-Id: Id08e21da31deb1f003b3cada4517651f3b3b2bb2 Reviewed-on: http://gerrit.cloudera.org:8080/5710 Reviewed-by: Dimitris Tsirogiannis <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3426a049 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3426a049 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3426a049 Branch: refs/heads/master Commit: 3426a04952eb5ee906b732b4644ce77c0d24fa7e Parents: fe2be25 Author: Dimitris Tsirogiannis <[email protected]> Authored: Fri Jan 13 09:55:56 2017 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Fri Jan 20 21:13:37 2017 +0000 ---------------------------------------------------------------------- .../impala/catalog/CatalogServiceCatalog.java | 66 ++++++- .../java/org/apache/impala/catalog/Table.java | 4 + .../impala/service/CatalogOpExecutor.java | 186 ++++++++++++------- 3 files changed, 183 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3426a049/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 31001a2..2c42874 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -180,6 +180,42 @@ public class CatalogServiceCatalog extends Catalog { localLibraryPath_ = new String("file://" + localLibraryPath); } + // Timeout for acquiring a table lock + // TODO: Make this configurable + private static final long TBL_LOCK_TIMEOUT_MS = 7200000; + // Time to sleep before retrying to acquire a table lock + private static final int TBL_LOCK_RETRY_MS = 10; + + /** + * Tries to acquire catalogLock_ and the lock of 'tbl' in that order. Returns true if it + * successfully acquires both within TBL_LOCK_TIMEOUT_MS millisecs; both locks are held + * when the function returns. Returns false otherwise and no lock is held in this case. + */ + public boolean tryLockTable(Table tbl) { + long begin = System.currentTimeMillis(); + long end; + do { + catalogLock_.writeLock().lock(); + if (tbl.getLock().tryLock()) { + if (LOG.isTraceEnabled()) { + end = System.currentTimeMillis(); + LOG.trace(String.format("Lock for table %s was acquired in %d msec", + tbl.getFullName(), end - begin)); + } + return true; + } + catalogLock_.writeLock().unlock(); + try { + // Sleep to avoid spinning and allow other operations to make progress. + Thread.sleep(TBL_LOCK_RETRY_MS); + } catch (InterruptedException e) { + // ignore + } + end = System.currentTimeMillis(); + } while (end - begin < TBL_LOCK_TIMEOUT_MS); + return false; + } + /** * Reads the current set of cache pools from HDFS and updates the catalog. * Called periodically by the cachePoolReader_. @@ -290,7 +326,8 @@ public class CatalogServiceCatalog extends Catalog { } // Protect the table from concurrent modifications. - synchronized(tbl) { + tbl.getLock().lock(); + try { // Only add the extended metadata if this table's version is >= // the fromVersion. if (tbl.getCatalogVersion() >= fromVersion) { @@ -307,6 +344,8 @@ public class CatalogServiceCatalog extends Catalog { } else { catalogTbl.setTable(new TTable(db.getName(), tblName)); } + } finally { + tbl.getLock().unlock(); } resp.addToObjects(catalogTbl); } @@ -919,8 +958,11 @@ public class CatalogServiceCatalog extends Catalog { } } - catalogLock_.writeLock().lock(); - synchronized(tbl) { + if (!tryLockTable(tbl)) { + throw new CatalogException(String.format("Error refreshing metadata for table " + + "%s due to lock contention", tbl.getFullName())); + } + try { long newCatalogVersion = incrementAndGetCatalogVersion(); catalogLock_.writeLock().unlock(); try (MetaStoreClient msClient = getMetaStoreClient()) { @@ -937,6 +979,9 @@ public class CatalogServiceCatalog extends Catalog { tbl.setCatalogVersion(newCatalogVersion); LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName())); return tbl; + } finally { + Preconditions.checkState(!catalogLock_.isWriteLockedByCurrentThread()); + tbl.getLock().unlock(); } } @@ -958,7 +1003,7 @@ public class CatalogServiceCatalog extends Catalog { throws CatalogException { Preconditions.checkNotNull(tbl); Preconditions.checkNotNull(partitionSet); - Preconditions.checkState(Thread.holdsLock(tbl)); + Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread()); if (!(tbl instanceof HdfsTable)) { throw new CatalogException("Table " + tbl.getFullName() + " is not an Hdfs table"); } @@ -979,7 +1024,7 @@ public class CatalogServiceCatalog extends Catalog { throws CatalogException { Preconditions.checkNotNull(tbl); Preconditions.checkNotNull(partitionSpec); - Preconditions.checkState(Thread.holdsLock(tbl)); + Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread()); if (!(tbl instanceof HdfsTable)) { throw new CatalogException("Table " + tbl.getFullName() + " is not an Hdfs table"); } @@ -994,7 +1039,6 @@ public class CatalogServiceCatalog extends Catalog { public Table addPartition(HdfsPartition partition) throws CatalogException { Preconditions.checkNotNull(partition); HdfsTable hdfsTable = partition.getTable(); - Db db = getDb(hdfsTable.getDb().getName()); hdfsTable.addPartition(partition); return hdfsTable; } @@ -1251,8 +1295,11 @@ public class CatalogServiceCatalog extends Catalog { */ public Table reloadPartition(Table tbl, List<TPartitionKeyValue> partitionSpec) throws CatalogException { - catalogLock_.writeLock().lock(); - synchronized (tbl) { + if (!tryLockTable(tbl)) { + throw new CatalogException(String.format("Error reloading partition of table %s " + + "due to lock contention", tbl.getFullName())); + } + try { long newCatalogVersion = incrementAndGetCatalogVersion(); catalogLock_.writeLock().unlock(); HdfsTable hdfsTable = (HdfsTable) tbl; @@ -1288,6 +1335,9 @@ public class CatalogServiceCatalog extends Catalog { LOG.info(String.format("Refreshed partition metadata: %s %s", hdfsTable.getFullName(), partitionName)); return hdfsTable; + } finally { + Preconditions.checkState(!catalogLock_.isWriteLockedByCurrentThread()); + tbl.getLock().unlock(); } } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3426a049/fe/src/main/java/org/apache/impala/catalog/Table.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index 9919dbf..01a4e55 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -17,6 +17,7 @@ package org.apache.impala.catalog; +import java.util.concurrent.locks.ReentrantLock; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; @@ -67,6 +68,8 @@ public abstract class Table implements CatalogObject { protected final String owner_; protected TTableDescriptor tableDesc_; protected TAccessLevel accessLevel_ = TAccessLevel.READ_WRITE; + // Lock protecting this table + private final ReentrantLock tableLock_ = new ReentrantLock(true); // Number of clustering columns. protected int numClusteringCols_; @@ -101,6 +104,7 @@ public abstract class Table implements CatalogObject { CatalogServiceCatalog.getLastDdlTime(msTable_) : -1; } + public ReentrantLock getLock() { return tableLock_; } public abstract TTableDescriptor toThriftDescriptor( int tableId, Set<Long> referencedPartitions); public abstract TCatalogObjectType getCatalogObjectType(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3426a049/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index fce6e07..3c1dad8 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -169,13 +169,31 @@ import com.google.common.collect.Sets; * * The following locking protocol is employed to ensure that modifying * the table metadata and assigning a new catalog version is performed atomically and - * consistently in the presence of concurrent DDL operations: - * 1. Acquire the catalog lock (see CatalogServiceCatalog.catalogLock_) - * 2. Acquire a table lock - * 3. Increment and get a new catalog version - * 4. Release the catalog lock - * 5. Modify table metadata - * 6. Release table lock + * consistently in the presence of concurrent DDL operations. The following pattern + * ensures that the catalog lock is never held for a long period of time, preventing + * other DDL operations from making progress. This pattern only applies to single-table + * update operations and requires the use of fair table locks to prevent starvation. + * + * DO { + * Acquire the catalog lock (see CatalogServiceCatalog.catalogLock_) + * Try to acquire a table lock + * IF the table lock acquisition fails { + * Release the catalog lock + * YIELD() + * ELSE + * BREAK + * } WHILE (TIMEOUT); + * + * If (TIMEOUT) report error + * + * Increment and get a new catalog version + * Release the catalog lock + * Modify table metadata + * Release table lock + * + * Note: The getCatalogObjects() function is the only case where this locking pattern is + * not used since it accesses multiple catalog entities in order to compute a snapshot + * of catalog metadata. * * Operations that CREATE/DROP catalog objects such as tables and databases employ the * following locking protocol: @@ -341,8 +359,12 @@ public class CatalogOpExecutor { TableName tableName = TableName.fromThrift(params.getTable_name()); Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl()); - catalog_.getLock().writeLock().lock(); - synchronized (tbl) { + + if (!catalog_.tryLockTable(tbl)) { + throw new InternalException(String.format("Error altering table %s due to lock " + + "contention.", tbl.getFullName())); + } + try { if (params.getAlter_type() == TAlterTableType.RENAME_VIEW || params.getAlter_type() == TAlterTableType.RENAME_TABLE) { // RENAME is implemented as an ADD + DROP, so we need to execute it as we hold @@ -376,10 +398,10 @@ public class CatalogOpExecutor { case ADD_PARTITION: TAlterTableAddPartitionParams addPartParams = params.getAdd_partition_params(); - // Create and add HdfsPartition object to the corresponding HdfsTable and load - // its block metadata. Get the new table object with an updated catalog - // version. If the partition already exists in Hive and "IfNotExists" is true, - // then return without populating the response object. + // Create and add HdfsPartition object to the corresponding HdfsTable and + // load its block metadata. Get the new table object with an updated catalog + // version. If the partition already exists in Hive and "IfNotExists" is + // true, then return without populating the response object. Table refreshedTable = alterTableAddPartition(tbl, addPartParams.getPartition_spec(), addPartParams.isIf_not_exists(), addPartParams.getLocation(), addPartParams.getCache_op()); @@ -429,8 +451,8 @@ public class CatalogOpExecutor { TAlterTableSetFileFormatParams fileFormatParams = params.getSet_file_format_params(); reloadFileMetadata = alterTableSetFileFormat( - tbl, fileFormatParams.getPartition_set(), fileFormatParams.getFile_format(), - numUpdatedPartitions); + tbl, fileFormatParams.getPartition_set(), + fileFormatParams.getFile_format(), numUpdatedPartitions); if (fileFormatParams.isSetPartition_set()) { resultColVal.setString_val( @@ -472,7 +494,8 @@ public class CatalogOpExecutor { String op = params.getSet_cached_params().getCache_op().isSet_cached() ? "Cached " : "Uncached "; if (params.getSet_cached_params().getPartition_set() == null) { - reloadFileMetadata = alterTableSetCached(tbl, params.getSet_cached_params()); + reloadFileMetadata = + alterTableSetCached(tbl, params.getSet_cached_params()); resultColVal.setString_val(op + "table."); } else { alterPartitionSetCached(tbl, params.getSet_cached_params(), @@ -491,8 +514,8 @@ public class CatalogOpExecutor { } if (reloadMetadata) { - loadTableMetadata(tbl, newCatalogVersion, reloadFileMetadata, reloadTableSchema, - null); + loadTableMetadata(tbl, newCatalogVersion, reloadFileMetadata, + reloadTableSchema, null); addTableToCatalogUpdate(tbl, response.result); } @@ -505,7 +528,10 @@ public class CatalogOpExecutor { resultSet.setRows(Lists.newArrayList(resultRow)); response.setResult_set(resultSet); } - } // end of synchronized block + } finally { + Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread()); + tbl.getLock().unlock(); + } } /** @@ -524,7 +550,7 @@ public class CatalogOpExecutor { */ private void alterKuduTable(TAlterTableParams params, TDdlExecResponse response, KuduTable tbl, long newCatalogVersion) throws ImpalaException { - Preconditions.checkState(Thread.holdsLock(tbl)); + Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); switch (params.getAlter_type()) { case ADD_REPLACE_COLUMNS: TAlterTableAddReplaceColsParams addReplaceColParams = @@ -566,7 +592,7 @@ public class CatalogOpExecutor { private void loadTableMetadata(Table tbl, long newCatalogVersion, boolean reloadFileMetadata, boolean reloadTableSchema, Set<String> partitionsToUpdate) throws CatalogException { - Preconditions.checkState(Thread.holdsLock(tbl)); + Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable(msClient, tbl); @@ -622,15 +648,20 @@ public class CatalogOpExecutor { "Null or empty column list given as argument to DdlExecutor.alterView"); Table tbl = catalog_.getTable(tableName.getDb(), tableName.getTbl()); Preconditions.checkState(tbl instanceof View); - catalog_.getLock().writeLock().lock(); - synchronized(tbl) { + + if (!catalog_.tryLockTable(tbl)) { + throw new InternalException(String.format("Error altering view %s due to lock " + + "contention", tbl.getFullName())); + } + try { long newCatalogVersion = catalog_.incrementAndGetCatalogVersion(); catalog_.getLock().writeLock().unlock(); // Operate on a copy of the metastore table to avoid prematurely applying the // alteration to our cached table in case the actual alteration fails. org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy(); - if (!msTbl.getTableType().equalsIgnoreCase((TableType.VIRTUAL_VIEW.toString()))) { + if (!msTbl.getTableType().equalsIgnoreCase( + (TableType.VIRTUAL_VIEW.toString()))) { throw new ImpalaRuntimeException( String.format("ALTER VIEW not allowed on a table: %s", tableName.toString())); @@ -647,6 +678,9 @@ public class CatalogOpExecutor { } tbl.setCatalogVersion(newCatalogVersion); addTableToCatalogUpdate(tbl, resp.result); + } finally { + Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread()); + tbl.getLock().unlock(); } } @@ -657,7 +691,7 @@ public class CatalogOpExecutor { private void alterTableUpdateStats(Table table, TAlterTableUpdateStatsParams params, TDdlExecResponse resp, Reference<Long> numUpdatedPartitions, Reference<Long> numUpdatedColumns) throws ImpalaException { - Preconditions.checkState(Thread.holdsLock(table)); + Preconditions.checkState(table.getLock().isHeldByCurrentThread()); if (params.isSetTable_stats()) { // Updating table and column stats via COMPUTE STATS. Preconditions.checkState( @@ -1086,8 +1120,11 @@ public class CatalogOpExecutor { Table table = getExistingTable(params.getTable_name().getDb_name(), params.getTable_name().getTable_name()); Preconditions.checkNotNull(table); - catalog_.getLock().writeLock().lock(); - synchronized(table) { + if (!catalog_.tryLockTable(table)) { + throw new InternalException(String.format("Error dropping stats for table %s " + + "due to lock contention", table.getFullName())); + } + try { long newCatalogVersion = catalog_.incrementAndGetCatalogVersion(); catalog_.getLock().writeLock().unlock(); if (params.getPartition_set() == null) { @@ -1096,8 +1133,9 @@ public class CatalogOpExecutor { dropColumnStats(table); dropTableStats(table); } else { + HdfsTable hdfsTbl = (HdfsTable) table; List<HdfsPartition> partitions = - ((HdfsTable)table).getPartitionsFromPartitionSet(params.getPartition_set()); + hdfsTbl.getPartitionsFromPartitionSet(params.getPartition_set()); if (partitions.isEmpty()) return; for(HdfsPartition partition : partitions) { @@ -1113,7 +1151,10 @@ public class CatalogOpExecutor { } loadTableMetadata(table, newCatalogVersion, false, true, null); addTableToCatalogUpdate(table, resp.result); - } // end of synchronization + } finally { + Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread()); + table.getLock().unlock(); + } } /** @@ -1121,7 +1162,7 @@ public class CatalogOpExecutor { * that were updated as part of this operation. */ private int dropColumnStats(Table table) throws ImpalaRuntimeException { - Preconditions.checkState(Thread.holdsLock(table)); + Preconditions.checkState(table.getLock().isHeldByCurrentThread()); int numColsUpdated = 0; try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { for (Column col: table.getColumns()) { @@ -1153,7 +1194,7 @@ public class CatalogOpExecutor { * is unpartitioned. */ private int dropTableStats(Table table) throws ImpalaException { - Preconditions.checkState(Thread.holdsLock(table)); + Preconditions.checkState(table.getLock().isHeldByCurrentThread()); // Delete the ROW_COUNT from the table (if it was set). org.apache.hadoop.hive.metastore.api.Table msTbl = table.getMetaStoreTable(); int numTargetedPartitions = 0; @@ -1406,8 +1447,11 @@ public class CatalogOpExecutor { String.format("TRUNCATE TABLE not supported on non-HDFS table: %s", table.getFullName())); } - catalog_.getLock().writeLock().lock(); - synchronized(table) { + if (!catalog_.tryLockTable(table)) { + throw new InternalException(String.format("Error truncating table %s due to lock " + + "contention", table.getFullName())); + } + try { long newCatalogVersion = catalog_.incrementAndGetCatalogVersion(); catalog_.getLock().writeLock().unlock(); try { @@ -1427,7 +1471,10 @@ public class CatalogOpExecutor { loadTableMetadata(table, newCatalogVersion, true, true, null); addTableToCatalogUpdate(table, resp.result); - } // end synchronization + } finally { + Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread()); + table.getLock().unlock(); + } } private void dropFunction(TDropFunctionParams params, TDdlExecResponse resp) @@ -1801,7 +1848,7 @@ public class CatalogOpExecutor { */ private void alterTableAddReplaceCols(Table tbl, List<TColumn> columns, boolean replaceExistingCols) throws ImpalaException { - Preconditions.checkState(Thread.holdsLock(tbl)); + Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy(); List<FieldSchema> newColumns = buildFieldSchemaList(columns); if (replaceExistingCols) { @@ -1821,7 +1868,7 @@ public class CatalogOpExecutor { */ private void alterTableChangeCol(Table tbl, String colName, TColumn newCol) throws ImpalaException { - Preconditions.checkState(Thread.holdsLock(tbl)); + Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy(); // Find the matching column name and change it. Iterator<FieldSchema> iterator = msTbl.getSd().getColsIterator(); @@ -1857,7 +1904,7 @@ public class CatalogOpExecutor { private Table alterTableAddPartition(Table tbl, List<TPartitionKeyValue> partitionSpec, boolean ifNotExists, String location, THdfsCachingOp cacheOp) throws ImpalaException { - Preconditions.checkState(Thread.holdsLock(tbl)); + Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); TableName tableName = tbl.getTableName(); if (ifNotExists && catalog_.containsHdfsPartition(tableName.getDb(), tableName.getTbl(), partitionSpec)) { @@ -1942,7 +1989,7 @@ public class CatalogOpExecutor { List<List<TPartitionKeyValue>> partitionSet, boolean ifExists, boolean purge, Reference<Long> numUpdatedPartitions) throws ImpalaException { - Preconditions.checkState(Thread.holdsLock(tbl)); + Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); Preconditions.checkNotNull(partitionSet); TableName tableName = tbl.getTableName(); @@ -2003,7 +2050,7 @@ public class CatalogOpExecutor { * Removes a column from the given table. */ private void alterTableDropCol(Table tbl, String colName) throws ImpalaException { - Preconditions.checkState(Thread.holdsLock(tbl)); + Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy(); // Find the matching column name and remove it. Iterator<FieldSchema> iterator = msTbl.getSd().getColsIterator(); @@ -2029,7 +2076,7 @@ public class CatalogOpExecutor { */ private void alterTableOrViewRename(Table oldTbl, TableName newTableName, TDdlExecResponse response) throws ImpalaException { - Preconditions.checkState(Thread.holdsLock(oldTbl) + Preconditions.checkState(oldTbl.getLock().isHeldByCurrentThread() && catalog_.getLock().isWriteLockedByCurrentThread()); TableName tableName = oldTbl.getTableName(); org.apache.hadoop.hive.metastore.api.Table msTbl = @@ -2106,7 +2153,7 @@ public class CatalogOpExecutor { List<List<TPartitionKeyValue>> partitionSet, THdfsFileFormat fileFormat, Reference<Long> numUpdatedPartitions) throws ImpalaException { - Preconditions.checkState(Thread.holdsLock(tbl)); + Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); Preconditions.checkState(partitionSet == null || !partitionSet.isEmpty()); boolean reloadFileMetadata = false; if (partitionSet == null) { @@ -2152,7 +2199,7 @@ public class CatalogOpExecutor { */ private boolean alterTableSetLocation(Table tbl, List<TPartitionKeyValue> partitionSpec, String location) throws ImpalaException { - Preconditions.checkState(Thread.holdsLock(tbl)); + Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); boolean reloadFileMetadata = false; if (partitionSpec == null) { org.apache.hadoop.hive.metastore.api.Table msTbl = @@ -2181,7 +2228,7 @@ public class CatalogOpExecutor { private void alterTableSetTblProperties(Table tbl, TAlterTableSetTblPropertiesParams params, Reference<Long> numUpdatedPartitions) throws ImpalaException { - Preconditions.checkState(Thread.holdsLock(tbl)); + Preconditions.checkState(tbl.getLock().isHeldByCurrentThread()); Map<String, String> properties = params.getProperties(); Preconditions.checkNotNull(properties); if (params.isSetPartition_set()) { @@ -2260,7 +2307,7 @@ public class CatalogOpExecutor { */ private boolean alterTableSetCached(Table tbl, TAlterTableSetCachedParams params) throws ImpalaException { - Preconditions.checkState(Thread.holdsLock(tbl)); + Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread()); THdfsCachingOp cacheOp = params.getCache_op(); Preconditions.checkNotNull(cacheOp); // Alter table params. @@ -2385,7 +2432,7 @@ public class CatalogOpExecutor { private void alterPartitionSetCached(Table tbl, TAlterTableSetCachedParams params, Reference<Long> numUpdatedPartitions) throws ImpalaException { - Preconditions.checkState(Thread.holdsLock(tbl)); + Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread()); THdfsCachingOp cacheOp = params.getCache_op(); Preconditions.checkNotNull(cacheOp); Preconditions.checkNotNull(params.getPartition_set()); @@ -2445,7 +2492,7 @@ public class CatalogOpExecutor { * Add partitions to metastore which exist in HDFS but not in metastore. */ private void alterTableRecoverPartitions(Table tbl) throws ImpalaException { - Preconditions.checkState(Thread.holdsLock(tbl)); + Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread()); if (!(tbl instanceof HdfsTable)) { throw new CatalogException("Table " + tbl.getFullName() + " is not an HDFS table"); } @@ -2995,14 +3042,16 @@ public class CatalogOpExecutor { update.getTarget_table()); } - catalog_.getLock().writeLock().lock(); - synchronized (table) { + if (!catalog_.tryLockTable(table)) { + throw new InternalException("Error updating the catalog due to lock contention."); + } + try { long newCatalogVersion = catalog_.incrementAndGetCatalogVersion(); catalog_.getLock().writeLock().unlock(); // Collects the cache directive IDs of any cached table/partitions that were - // targeted. A watch on these cache directives is submitted to the TableLoadingMgr - // and the table will be refreshed asynchronously after all cache directives - // complete. + // targeted. A watch on these cache directives is submitted to the + // TableLoadingMgr and the table will be refreshed asynchronously after all + // cache directives complete. List<Long> cacheDirIds = Lists.<Long>newArrayList(); // If the table is cached, get its cache pool name and replication factor. New @@ -3020,24 +3069,27 @@ public class CatalogOpExecutor { // In the BE, we don't currently distinguish between which targeted partitions // are new and which already exist, so initialize the set with all targeted // partition names and remove the ones that are found to exist. - HashSet<String> partsToCreate = Sets.newHashSet(update.getCreated_partitions()); + HashSet<String> partsToCreate = + Sets.newHashSet(update.getCreated_partitions()); partsToLoadMetadata = Sets.newHashSet(partsToCreate); for (HdfsPartition partition: ((HdfsTable) table).getPartitions()) { // Skip dummy default partition. - if (partition.getId() == ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID) { + long partitionId = partition.getId(); + if (partitionId == ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID) { continue; } - // TODO: In the BE we build partition names without a trailing char. In FE we - // build partition name with a trailing char. We should make this consistent. + // TODO: In the BE we build partition names without a trailing char. In FE + // we build partition name with a trailing char. We should make this + // consistent. String partName = partition.getPartitionName() + "/"; // Attempt to remove this partition name from from partsToCreate. If remove // returns true, it indicates the partition already exists. if (partsToCreate.remove(partName) && partition.isMarkedCached()) { - // The partition was targeted by the insert and is also a cached. Since data - // was written to the partition, a watch needs to be placed on the cache - // cache directive so the TableLoadingMgr can perform an async refresh once - // all data becomes cached. + // The partition was targeted by the insert and is also a cached. Since + // data was written to the partition, a watch needs to be placed on the + // cache cache directive so the TableLoadingMgr can perform an async + // refresh once all data becomes cached. cacheDirIds.add(HdfsCachingUtil.getCacheDirectiveId( partition.getParameters())); } @@ -3099,7 +3151,8 @@ public class CatalogOpExecutor { tblName.getTbl(), cachedHmsParts); } catch (Exception e) { LOG.error("Failed in alter_partitions: ", e); - // Try to uncache the partitions when the alteration in the HMS failed. + // Try to uncache the partitions when the alteration in the HMS + // failed. for (org.apache.hadoop.hive.metastore.api.Partition part: cachedHmsParts) { try { @@ -3107,9 +3160,9 @@ public class CatalogOpExecutor { } catch (ImpalaException e1) { String msg = String.format( "Partition %s.%s(%s): State: Leaked caching directive. " + - "Action: Manually uncache directory %s via hdfs cacheAdmin.", - part.getDbName(), part.getTableName(), part.getValues(), - part.getSd().getLocation()); + "Action: Manually uncache directory %s via hdfs " + + "cacheAdmin.", part.getDbName(), part.getTableName(), + part.getValues(), part.getSd().getLocation()); LOG.error(msg, e); errorMessages.add(msg); } @@ -3146,8 +3199,11 @@ public class CatalogOpExecutor { loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata); addTableToCatalogUpdate(table, response.result); - } // end of synchronized block - return response; + return response; + } finally { + Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread()); + table.getLock().unlock(); + } } private List<String> getPartValsFromName(org.apache.hadoop.hive.metastore.api.Table
