This is an automated email from the ASF dual-hosted git repository. laszlog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push: new dbac6ab13 IMPALA-14400: Fix deadlock in CatalogServiceCatalog.getDbProperty() dbac6ab13 is described below commit dbac6ab13ad5cbd40ac31e9921265396de6c9433 Author: Riza Suminto <riza.sumi...@cloudera.com> AuthorDate: Thu Sep 4 13:52:25 2025 -0700 IMPALA-14400: Fix deadlock in CatalogServiceCatalog.getDbProperty() IMPALA-13850 (part 4) modify CatalogServiceCatalog.getDb() to delay looking up catalog cache until initial reset() is complete. EventProcessor can start processing event before reset() happen and obtain versionLock_.readLock() when calling CatalogServiceCatalog.getDbProperty(). Later on, it will hit deadlock when attempting to obtain versionLock_.writeLock() through getDb() / waitInitialResetCompletion(). This lock upgrade from read to write is unsafe. This patch mitigate the issue by changing waitInitialResetCompletion() to not acquire write lock. After this patch, it will sleep for 100ms before loop and checking again if initial reset has complete. Modified CatalogResetManager.fetchingDbs_ to ConcurrentLinkedQueue so that isActive() can be called without holding write lock. Add helper class ReadLockAndLookupDb and WriteLockAndLookupDb. Both will call waitInitialResetCompletion() before obtaining the appropriate lock. In case of WriteLockAndLookupDb, it additionally will call resetManager_.waitOngoingMetadataFetch() to ensure dbCache_ lookup is safe for write purpose. Skip calling catalog_.startEventsProcessor() in JniCatalog constructor. Instead, let CatalogServiceCatalog.reset() start it at the end of cache population. Added @Nullable annotations on CatalogServiceCatalog methods that can return null. Fixed some null check warnings that shows up afterwards. Remove dead code CatalogServiceCatalog.addUserIfNotExists() and CatalogOpExecutor.getCurrentEventId(). Testing: Increase TRIGGER_RESET_METADATA_DELAY from 1s to 3s in test_metadata_after_failover_with_delayed_reset. It was easy to hit the deadlock with 3s delay before the patch. No more deadlock happen after the patch. Run and pass test_catalogd_ha.py and test_restart_services.py exhaustively. Change-Id: I3162472ea9531add77886bf1d0d73460ff34d07a Reviewed-on: http://gerrit.cloudera.org:8080/23382 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Riza Suminto <riza.sumi...@cloudera.com> --- .../apache/impala/catalog/CatalogResetManager.java | 17 +- .../impala/catalog/CatalogServiceCatalog.java | 274 ++++++++++++--------- .../apache/impala/service/CatalogOpExecutor.java | 24 +- .../java/org/apache/impala/service/JniCatalog.java | 2 +- tests/custom_cluster/test_catalogd_ha.py | 2 +- 5 files changed, 179 insertions(+), 140 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogResetManager.java b/fe/src/main/java/org/apache/impala/catalog/CatalogResetManager.java index db2370cae..688fa4da0 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogResetManager.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogResetManager.java @@ -36,6 +36,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -79,7 +80,7 @@ public class CatalogResetManager { // A queue of database that undergoes metadata fetch. // If not empty, the elements are always in lexicographic order head to tail and should // not contain any blacklisted Dbs. If empty, then no reset operation is currently - // running. + // running. Initialized as ConcurrentLinkedQueue in constructor. private final Queue<Pair<String, Future<PrefetchedDatabaseObjects>>> fetchingDbs_; // A queue of database names that are pending to be fetched. @@ -93,7 +94,7 @@ public class CatalogResetManager { public CatalogResetManager(CatalogServiceCatalog catalog) { catalog_ = catalog; fetchMetadataCondition_ = catalog.getLock().writeLock().newCondition(); - fetchingDbs_ = new LinkedList<>(); + fetchingDbs_ = new ConcurrentLinkedQueue<>(); pendingDbNames_ = new LinkedList<>(); } @@ -141,9 +142,10 @@ public class CatalogResetManager { /** * Returns True if there is an ongoing fetch operation. + * Does not need to hold versionLock_.writeLock() to call, but the status might change + * after method returns. */ protected boolean isActive() { - Preconditions.checkState(threadIsHoldingWriteLock()); return !fetchingDbs_.isEmpty(); } @@ -161,6 +163,7 @@ public class CatalogResetManager { /** * Signal all threads waiting on resetMetadataCondition_. + * Must hold versionLock_.writeLock(). */ protected void signalAllWaiters() { Preconditions.checkState(threadIsHoldingWriteLock()); @@ -169,14 +172,16 @@ public class CatalogResetManager { /** * Peek the next fetching database. + * Does not need to hold versionLock_.writeLock() to call, but it might change + * after method return if something else call {@link #pollFetchingDb()} concurrently. */ protected Pair<String, Future<PrefetchedDatabaseObjects>> peekFetchingDb() { - Preconditions.checkState(threadIsHoldingWriteLock()); return fetchingDbs_.peek(); } /** * Poll the next fetching database and schedule the next reset task. + * Must hold versionLock_.writeLock(). */ protected Pair<String, Future<PrefetchedDatabaseObjects>> pollFetchingDb() { Preconditions.checkState(threadIsHoldingWriteLock()); @@ -188,6 +193,7 @@ public class CatalogResetManager { /** * Return a list of all currently resetting databases. + * Must hold versionLock_.writeLock(). */ protected List<String> allFetcingDbList() { Preconditions.checkState(threadIsHoldingWriteLock()); @@ -198,6 +204,7 @@ public class CatalogResetManager { /** * Wait until all parallel fetch finish. + * Must hold versionLock_.writeLock(). */ protected void waitFullMetadataFetch() { Preconditions.checkState(threadIsHoldingWriteLock()); @@ -216,6 +223,7 @@ public class CatalogResetManager { /** * Wait until it is ensured that given 'dbName' has been polled out. * This method will lower case 'dbName' for matching. + * Must hold versionLock_.writeLock(). */ protected void waitOngoingMetadataFetch(String dbName) { waitOngoingMetadataFetch(ImmutableList.of(dbName)); @@ -224,6 +232,7 @@ public class CatalogResetManager { /** * Wait until it is ensured that all 'dbNames' has been polled out. * This method will lower case 'dbNames' and sort them for matching. + * Must hold versionLock_.writeLock(). */ protected void waitOngoingMetadataFetch(List<String> dbNames) { Preconditions.checkState(threadIsHoldingWriteLock()); diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index d8a639123..9d6c46e59 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -89,10 +89,10 @@ import org.apache.impala.catalog.events.MetastoreNotificationFetchException; import org.apache.impala.catalog.events.NoOpEventProcessor; import org.apache.impala.catalog.events.SelfEventContext; import org.apache.impala.catalog.metastore.CatalogHmsUtils; -import org.apache.impala.catalog.monitor.CatalogMonitor; -import org.apache.impala.catalog.monitor.CatalogTableMetrics; import org.apache.impala.catalog.metastore.HmsApiNameEnum; import org.apache.impala.catalog.metastore.ICatalogMetastoreServer; +import org.apache.impala.catalog.monitor.CatalogMonitor; +import org.apache.impala.catalog.monitor.CatalogTableMetrics; import org.apache.impala.catalog.monitor.TableLoadingTimeHistogram; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.ImpalaException; @@ -109,11 +109,11 @@ import org.apache.impala.service.JniCatalog; import org.apache.impala.thrift.CatalogLookupStatus; import org.apache.impala.thrift.CatalogServiceConstants; import org.apache.impala.thrift.TCatalog; -import org.apache.impala.thrift.TCatalogdHmsCacheMetrics; import org.apache.impala.thrift.TCatalogInfoSelector; import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TCatalogUpdateResult; +import org.apache.impala.thrift.TCatalogdHmsCacheMetrics; import org.apache.impala.thrift.TDataSource; import org.apache.impala.thrift.TDatabase; import org.apache.impala.thrift.TErrorCode; @@ -171,11 +171,11 @@ import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.base.Stopwatch; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -272,6 +272,8 @@ public class CatalogServiceCatalog extends Catalog { public static final long LOCK_RETRY_TIMEOUT_MS = 7200000; // Time to sleep before retrying to acquire a table lock private static final int LOCK_RETRY_DELAY_MS = 10; + // Time to sleep before retrying to check initial reset completion. + private static final int INITIAL_RESET_DELAY_CHECK_MS = 100; // Threshold to add warning logs for slow lock acquiring. private static final long LOCK_ACQUIRING_DURATION_WARN_MS = 100; // default value of table id in the GetPartialCatalogObjectRequest @@ -291,6 +293,7 @@ public class CatalogServiceCatalog extends Catalog { private final ReentrantReadWriteLock versionLock_ = new ReentrantReadWriteLock(true); // Executor service for fetching catalog objects from Metastore in the background. + // Most access must hold versionLock_.writeLock() unless when documented otherwise. private final CatalogResetManager resetManager_ = new CatalogResetManager(this); // Last assigned catalog version. Starts at INITIAL_CATALOG_VERSION and is incremented @@ -489,6 +492,7 @@ public class CatalogServiceCatalog extends Catalog { /** * If initial reset has just begin, wait until it is completed. + * No-op if initial reset has already completed or has not yet started. * @param dbName if supplied, wait can return early if the db is found in the cache. */ private void waitInitialResetCompletion(@Nullable String dbName) { @@ -499,7 +503,6 @@ public class CatalogServiceCatalog extends Catalog { break; } - versionLock_.writeLock().lock(); try { if (!resetManager_.isActive()) { // Catalog is not currently resetting, so we can stop wait. @@ -514,14 +517,10 @@ public class CatalogServiceCatalog extends Catalog { (dbName != null ? " for db " + dbName : "")); isWaiting = true; } - if (dbName != null) { - resetManager_.waitOngoingMetadataFetch(dbName); - } else { - resetManager_.waitFullMetadataFetch(); - } + Thread.sleep(INITIAL_RESET_DELAY_CHECK_MS); } - } finally { - versionLock_.writeLock().unlock(); + } catch (InterruptedException e) { + // ignore; } } if (isWaiting) { @@ -529,12 +528,21 @@ public class CatalogServiceCatalog extends Catalog { } } + /** + * Gets the Db object from the Catalog using a case-insensitive lookup on the name. + * Returns null if no matching database is found. + * Caller must not hold the versionLock_.readLock() to avoid deadlock. + */ @Override - public Db getDb(String dbName) { + public @Nullable Db getDb(String dbName) { waitInitialResetCompletion(dbName); return super.getDb(dbName); } + /** + * Returns all databases that match 'matcher'. + * Caller must not hold the versionLock_.readLock() to avoid deadlock. + */ @Override public List<Db> getDbs(PatternMatcher matcher) { waitInitialResetCompletion(null); @@ -1458,6 +1466,62 @@ public class CatalogServiceCatalog extends Catalog { } } + /** + * Utillity class to facilitate simple read or write locking of versionLock_ and + * lookup the requested database from dbCache_. The lock is released when close() + * is called, which is automatically called when using try-with-resources statement. + * Use either the ReadLockAndLookupDb or WriteLockAndLookupDb subclass to get + * the desired lock type. + */ + private abstract class LockCatalogAndLookupDb implements AutoCloseable { + private final String dbName_; + private final boolean forWrite_; + + public LockCatalogAndLookupDb(String dbName, boolean forWrite) { + Preconditions.checkArgument(dbName != null && !dbName.isEmpty(), + "Null or empty database name given as argument to Catalog.getDb"); + + // The remaining constructor code must be exception-safe since this + // constructor is called from a try-with-resources statement. + dbName_ = dbName; + forWrite_ = forWrite; + + waitInitialResetCompletion(dbName_); + if (forWrite_) { + versionLock_.writeLock().lock(); + } else { + versionLock_.readLock().lock(); + } + } + + protected @Nullable Db getDb() { + if (forWrite_) { + // If we are holding the write lock, wait for any ongoing reset to complete + // before returning the Db. This ensures that writers always see the latest + // catalog state. + resetManager_.waitOngoingMetadataFetch(dbName_); + } + return dbCache_.get(dbName_); + } + + @Override + public void close() { + if (forWrite_) { + versionLock_.writeLock().unlock(); + } else { + versionLock_.readLock().unlock(); + } + } + } + + private class ReadLockAndLookupDb extends LockCatalogAndLookupDb { + public ReadLockAndLookupDb(String dbName) { super(dbName, false); } + } + + private class WriteLockAndLookupDb extends LockCatalogAndLookupDb { + public WriteLockAndLookupDb(String dbName) { super(dbName, true); } + } + /** * A helper class to iterate list of Db. * iterator() method will return an iterator that will wait for ongoing @@ -1641,17 +1705,14 @@ public class CatalogServiceCatalog extends Catalog { * @return value of key from the db parameter. returns null if Db is not found or key * does not exist in the parameters */ - public String getDbProperty(String dbName, String propertyKey) { + public @Nullable String getDbProperty(String dbName, String propertyKey) { Preconditions.checkNotNull(dbName); Preconditions.checkNotNull(propertyKey); - versionLock_.readLock().lock(); - try { - Db db = getDb(dbName); + try (ReadLockAndLookupDb result = new ReadLockAndLookupDb(dbName)) { + Db db = result.getDb(); if (db == null) return null; if (!db.getMetaStoreDb().isSetParameters()) return null; return db.getMetaStoreDb().getParameters().get(propertyKey); - } finally { - versionLock_.readLock().unlock(); } } @@ -1661,14 +1722,13 @@ public class CatalogServiceCatalog extends Catalog { * @return Value of the parameter which maps to property key, null if the table * doesn't exist, if it is a incomplete table or if the parameter is not found */ - public List<String> getTableProperties( + public @Nullable List<String> getTableProperties( String dbName, String tblName, List<String> propertyKeys) { Preconditions.checkNotNull(dbName); Preconditions.checkNotNull(tblName); Preconditions.checkNotNull(propertyKeys); - versionLock_.readLock().lock(); - try { - Db db = getDb(dbName); + try (ReadLockAndLookupDb result = new ReadLockAndLookupDb(dbName)) { + Db db = result.getDb(); if (db == null) return null; Table tbl = db.getTable(tblName); if (tbl == null || tbl instanceof IncompleteTable) return null; @@ -1678,8 +1738,6 @@ public class CatalogServiceCatalog extends Catalog { propertyValues.add(tbl.getMetaStoreTable().getParameters().get(propertyKey)); } return propertyValues; - } finally { - versionLock_.readLock().unlock(); } } @@ -1695,18 +1753,14 @@ public class CatalogServiceCatalog extends Catalog { public Db updateDb(Database msDb) throws DatabaseNotFoundException { Preconditions.checkNotNull(msDb); Preconditions.checkNotNull(msDb.getName()); - versionLock_.writeLock().lock(); - try { - resetManager_.waitOngoingMetadataFetch(msDb.getName()); - Db db = getDb(msDb.getName()); + try (WriteLockAndLookupDb result = new WriteLockAndLookupDb(msDb.getName())) { + Db db = result.getDb(); if (db == null) { throw new DatabaseNotFoundException("Database " + msDb.getName() + " not found"); } db.setMetastoreDb(msDb.getName(), msDb); db.setCatalogVersion(incrementAndGetCatalogVersion()); return db; - } finally { - versionLock_.writeLock().unlock(); } } @@ -2295,7 +2349,7 @@ public class CatalogServiceCatalog extends Catalog { * 'currentEventId' is set as createEventId of all tables under this db. Also used to * track removed tables in EventDeleteLog. */ - private Pair<Db, List<TTableName>> invalidateDb(String dbName, Db existingDb, + private @Nullable Pair<Db, List<TTableName>> invalidateDb(String dbName, Db existingDb, PrefetchedDatabaseObjects prefetchedObjects, EventSequence catalogTimeline, long currentEventId) { try { @@ -2487,8 +2541,13 @@ public class CatalogServiceCatalog extends Catalog { // clear all txn to write ids mapping so that there is no memory leak for previous // events clearWriteIds(); - // restart the event processing for id just before the reset - metastoreEventProcessor_.start(currentEventId); + if (isCatalogServerRequest) { + // start cleanly if request comes from catalog server. + metastoreEventProcessor_.start(); + } else { + // restart the event processing for id just before the reset + metastoreEventProcessor_.start(currentEventId); + } } unlockedTimer.stop(); resetTimer.stop(); @@ -2697,22 +2756,20 @@ public class CatalogServiceCatalog extends Catalog { deleteLog_.addRemovedObject(db.toTCatalogObject()); } - public Table addIncompleteTable(String dbName, String tblName, TImpalaTableType tblType, - String tblComment) { + public @Nullable Table addIncompleteTable( + String dbName, String tblName, TImpalaTableType tblType, String tblComment) { return addIncompleteTable(dbName, tblName, tblType, tblComment, -1L); } /** * Adds a table with the given name to the catalog and returns the new table. */ - public Table addIncompleteTable(String dbName, String tblName, TImpalaTableType tblType, - String tblComment, long createEventId) { - versionLock_.writeLock().lock(); - try { - resetManager_.waitOngoingMetadataFetch(dbName); + public @Nullable Table addIncompleteTable(String dbName, String tblName, + TImpalaTableType tblType, String tblComment, long createEventId) { + try (WriteLockAndLookupDb result = new WriteLockAndLookupDb(dbName)) { // IMPALA-9211: get db object after holding the writeLock in case of getting stale // db object due to concurrent INVALIDATE METADATA - Db db = getDb(dbName); + Db db = result.getDb(); if (db == null) return null; Table existingTbl = db.getTable(tblName); if (existingTbl instanceof HdfsTable) { @@ -2726,8 +2783,6 @@ public class CatalogServiceCatalog extends Catalog { incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion()); db.addTable(incompleteTable); return db.getTable(tblName); - } finally { - versionLock_.writeLock().unlock(); } } @@ -2881,10 +2936,9 @@ public class CatalogServiceCatalog extends Catalog { */ private @Nullable Table replaceTableIfUnchanged(Table updatedTbl, long expectedCatalogVersion, long tableId) throws DatabaseNotFoundException { - versionLock_.writeLock().lock(); - try { - resetManager_.waitOngoingMetadataFetch(updatedTbl.getDb().getName()); - Db db = getDb(updatedTbl.getDb().getName()); + try (WriteLockAndLookupDb result = + new WriteLockAndLookupDb(updatedTbl.getDb().getName())) { + Db db = result.getDb(); if (db == null) { throw new DatabaseNotFoundException( "Database does not exist: " + updatedTbl.getDb().getName()); @@ -2941,8 +2995,6 @@ public class CatalogServiceCatalog extends Catalog { // MAX_NUM_SKIPPED_TOPIC_UPDATES limit. db.addTable(updatedTbl); return updatedTbl; - } finally { - versionLock_.writeLock().unlock(); } } @@ -2950,11 +3002,9 @@ public class CatalogServiceCatalog extends Catalog { * Removes a table from the catalog and increments the catalog version. * Returns the removed Table, or null if the table or db does not exist. */ - public Table removeTable(String dbName, String tblName) { - versionLock_.writeLock().lock(); - try { - resetManager_.waitOngoingMetadataFetch(dbName); - Db parentDb = getDb(dbName); + public @Nullable Table removeTable(String dbName, String tblName) { + try (WriteLockAndLookupDb result = new WriteLockAndLookupDb(dbName)) { + Db parentDb = result.getDb(); if (parentDb == null) return null; Table removedTable = parentDb.removeTable(tblName); if (removedTable != null && !removedTable.isStoredInImpaladCatalogCache()) { @@ -2965,8 +3015,6 @@ public class CatalogServiceCatalog extends Catalog { deleteLog_.addRemovedObject(removedTable.toMinimalTCatalogObject()); } return removedTable; - } finally { - versionLock_.writeLock().unlock(); } } @@ -2997,17 +3045,13 @@ public class CatalogServiceCatalog extends Catalog { */ @Override public boolean addFunction(Function fn) { - versionLock_.writeLock().lock(); - try { - resetManager_.waitOngoingMetadataFetch(fn.dbName()); - Db db = getDb(fn.getFunctionName().getDb()); + try (WriteLockAndLookupDb result = new WriteLockAndLookupDb(fn.dbName())) { + Db db = result.getDb(); if (db == null) return false; if (db.addFunction(fn)) { fn.setCatalogVersion(incrementAndGetCatalogVersion()); return true; } - } finally { - versionLock_.writeLock().unlock(); } return false; } @@ -3031,7 +3075,7 @@ public class CatalogServiceCatalog extends Catalog { } @Override - public DataSource removeDataSource(String dataSourceName) { + public @Nullable DataSource removeDataSource(String dataSourceName) { versionLock_.writeLock().lock(); try { DataSource dataSource = dataSources_.remove(dataSourceName); @@ -3052,24 +3096,34 @@ public class CatalogServiceCatalog extends Catalog { * add of the new table was not successful). Depending on the return value, the catalog * cache is in one of the following states: * 1. null, null: Old table was not removed and new table was not added. - * 2. null, T_new: Invalid configuration - * 3. T_old, null: Old table was removed but new table was not added. - * 4. T_old, T_new: Old table was removed and new table was added. + * 2. T_old, null: Old table was removed but new table was not added. + * 3. T_old, T_new: Old table was removed and new table was added. * Updates 'createEventId' of the new table using 'alterEventId'. */ public Pair<Table, Table> renameTable( TTableName oldTableName, TTableName newTableName, long alterEventId) { - // Remove the old table name from the cache and add the new table. - Db db = getDb(oldTableName.getDb_name()); - if (db == null) return Pair.create(null, null); versionLock_.writeLock().lock(); try { + // Make sure any ongoing metadata fetch has progress beyond oldTableName + // and newTableName. List<String> dbNames = ImmutableList.of(oldTableName.getDb_name(), newTableName.getDb_name()); resetManager_.waitOngoingMetadataFetch(dbNames); + + // Remove the old table name from the cache and add the new table. + Db db = getDb(oldTableName.getDb_name()); + if (db == null) { + // Case 1 - db does not exist. + return Pair.create(null, null); + } + Table oldTable = removeTable(oldTableName.getDb_name(), oldTableName.getTable_name()); - if (oldTable == null) return Pair.create(null, null); + if (oldTable == null) { + // Case 1 - old table did not exist. + return Pair.create(null, null); + } + if (alterEventId < oldTable.getCreateEventId()) { // This is usually due to alterEventId = -1, e.g. failed to fetch and check // HMS events in the callers. Fallback to use the original createEventId. @@ -3077,6 +3131,8 @@ public class CatalogServiceCatalog extends Catalog { LOG.warn("Reusing original createEventId {} for table {}.{}", alterEventId, newTableName.getDb_name(), newTableName.getTable_name()); } + + // Case 2 or 3. return Pair.create(oldTable, addIncompleteTable(newTableName.getDb_name(), newTableName.getTable_name(), oldTable.getTableType(), oldTable.getTableComment(), alterEventId)); @@ -3261,7 +3317,7 @@ public class CatalogServiceCatalog extends Catalog { return hdfsTable; } - public TCatalogObject invalidateTable(TTableName tableName, + public @Nullable TCatalogObject invalidateTable(TTableName tableName, Reference<Boolean> tblWasRemoved, Reference<Boolean> dbWasAdded, EventSequence catalogTimeline) { return invalidateTable(tableName, tblWasRemoved, dbWasAdded, catalogTimeline, -1L); @@ -3288,7 +3344,7 @@ public class CatalogServiceCatalog extends Catalog { * 'eventId' is used to update createEventId of the table which avoids the table being * dropped in processing older events. */ - public TCatalogObject invalidateTable(TTableName tableName, + public @Nullable TCatalogObject invalidateTable(TTableName tableName, Reference<Boolean> tblWasRemoved, Reference<Boolean> dbWasAdded, EventSequence catalogTimeline, long eventId) { tblWasRemoved.setRef(false); @@ -3338,7 +3394,7 @@ public class CatalogServiceCatalog extends Catalog { // The table does not exist in our cache AND it is unknown whether the // table exists in the Metastore. Do nothing. return null; - } else if (db == null && !metaRes.isEmpty()) { + } else if (db == null && metaRes != null && !metaRes.isEmpty()) { // The table exists in the Metastore, but our cache does not contain the parent // database. A new db will be added to the cache along with the new table. msDb // must be valid since tableExistsInMetaStore is true. @@ -3384,12 +3440,10 @@ public class CatalogServiceCatalog extends Catalog { * Table. * @return null if the table does not exist else return the invalidated table */ - public Table invalidateTableIfExists(String dbName, String tblName) { + public @Nullable Table invalidateTableIfExists(String dbName, String tblName) { Table incompleteTable; - versionLock_.writeLock().lock(); - try { - resetManager_.waitOngoingMetadataFetch(dbName); - Db db = getDb(dbName); + try (WriteLockAndLookupDb result = new WriteLockAndLookupDb(dbName)) { + Db db = result.getDb(); if (db == null) return null; Table existingTbl = db.getTable(tblName); if (existingTbl == null) return null; @@ -3398,8 +3452,6 @@ public class CatalogServiceCatalog extends Catalog { existingTbl.getCreateEventId()); incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion()); db.addTable(incompleteTable); - } finally { - versionLock_.writeLock().unlock(); } scheduleTableLoading(dbName, tblName); return incompleteTable; @@ -3504,25 +3556,6 @@ public class CatalogServiceCatalog extends Catalog { return (User) user; } - /** - * Add a user to the catalog if it doesn't exist. This is necessary so privileges - * can be added for a user. example: owner privileges. - */ - public User addUserIfNotExists(String owner, Reference<Boolean> existingUser) { - versionLock_.writeLock().lock(); - try { - User user = getAuthPolicy().getUser(owner); - existingUser.setRef(Boolean.TRUE); - if (user == null) { - user = addUser(owner); - existingUser.setRef(Boolean.FALSE); - } - return user; - } finally { - versionLock_.writeLock().unlock(); - } - } - private Principal addPrincipal(String principalName, Set<String> grantGroups, TPrincipalType type) { versionLock_.writeLock().lock(); @@ -3541,7 +3574,7 @@ public class CatalogServiceCatalog extends Catalog { * removed role with an incremented catalog version, or null if no role with this name * exists. */ - public Role removeRole(String roleName) { + public @Nullable Role removeRole(String roleName) { Principal role = removePrincipal(roleName, TPrincipalType.ROLE); if (role == null) return null; Preconditions.checkState(role instanceof Role); @@ -3553,14 +3586,14 @@ public class CatalogServiceCatalog extends Catalog { * removed user with an incremented catalog version, or null if no user with this name * exists. */ - public User removeUser(String userName) { + public @Nullable User removeUser(String userName) { Principal user = removePrincipal(userName, TPrincipalType.USER); if (user == null) return null; Preconditions.checkState(user instanceof User); return (User) user; } - private Principal removePrincipal(String principalName, TPrincipalType type) { + private @Nullable Principal removePrincipal(String principalName, TPrincipalType type) { versionLock_.writeLock().lock(); try { Principal principal = authPolicy_.removePrincipal(principalName, type); @@ -3659,8 +3692,8 @@ public class CatalogServiceCatalog extends Catalog { * matching privilege was found. Throws a CatalogException if no role exists with this * name. */ - public PrincipalPrivilege removeRolePrivilege(String roleName, String privilegeName) - throws CatalogException { + public @Nullable PrincipalPrivilege removeRolePrivilege( + String roleName, String privilegeName) throws CatalogException { return removePrincipalPrivilege(roleName, privilegeName, TPrincipalType.ROLE); } @@ -3670,12 +3703,12 @@ public class CatalogServiceCatalog extends Catalog { * matching privilege was found. Throws a CatalogException if no user exists with this * name. */ - public PrincipalPrivilege removeUserPrivilege(String userName, String privilegeName) - throws CatalogException { + public @Nullable PrincipalPrivilege removeUserPrivilege( + String userName, String privilegeName) throws CatalogException { return removePrincipalPrivilege(userName, privilegeName, TPrincipalType.USER); } - private PrincipalPrivilege removePrincipalPrivilege(String principalName, + private @Nullable PrincipalPrivilege removePrincipalPrivilege(String principalName, String privilegeName, TPrincipalType type) throws CatalogException { versionLock_.writeLock().lock(); try { @@ -3717,7 +3750,7 @@ public class CatalogServiceCatalog extends Catalog { } @Override - public AuthzCacheInvalidation getAuthzCacheInvalidation(String markerName) { + public @Nullable AuthzCacheInvalidation getAuthzCacheInvalidation(String markerName) { versionLock_.readLock().lock(); try { return authzCacheInvalidation_.get(markerName); @@ -4309,22 +4342,21 @@ public class CatalogServiceCatalog extends Catalog { throws CatalogException { TCatalogObject objectDesc = Preconditions.checkNotNull(req.object_desc, "missing object_desc"); + String dbName = ""; switch (objectDesc.type) { case CATALOG: return getPartialCatalogInfo(req); case DATABASE: TDatabase dbDesc = Preconditions.checkNotNull(req.object_desc.db); - versionLock_.readLock().lock(); - try { - Db db = getDb(dbDesc.getDb_name()); + dbName = dbDesc.getDb_name(); + try (ReadLockAndLookupDb result = new ReadLockAndLookupDb(dbName)) { + Db db = result.getDb(); if (db == null) { return createGetPartialCatalogObjectError(req, CatalogLookupStatus.DB_NOT_FOUND); } return db.getPartialInfo(req); - } finally { - versionLock_.readLock().unlock(); } case TABLE: case VIEW: { @@ -4334,8 +4366,8 @@ public class CatalogServiceCatalog extends Catalog { long tableId = TABLE_ID_UNAVAILABLE; if (req.table_info_selector.valid_write_ids != null) { Preconditions.checkState(objectDesc.type.equals(TABLE)); - String dbName = objectDesc.getTable().db_name == null ? Catalog.DEFAULT_DB - : objectDesc.getTable().db_name; + dbName = objectDesc.getTable().db_name == null ? Catalog.DEFAULT_DB : + objectDesc.getTable().db_name; String tblName = objectDesc.getTable().tbl_name; writeIdList = MetastoreShim.getValidWriteIdListFromThrift( dbName + "." + tblName, req.table_info_selector.valid_write_ids); @@ -4377,9 +4409,9 @@ public class CatalogServiceCatalog extends Catalog { } } case FUNCTION: { - versionLock_.readLock().lock(); - try { - Db db = getDb(objectDesc.fn.name.db_name); + dbName = objectDesc.fn.name.db_name; + try (ReadLockAndLookupDb result = new ReadLockAndLookupDb(dbName)) { + Db db = result.getDb(); if (db == null) { return createGetPartialCatalogObjectError(req, CatalogLookupStatus.DB_NOT_FOUND); @@ -4395,8 +4427,6 @@ public class CatalogServiceCatalog extends Catalog { for (Function f : funcs) thriftFuncs.add(f.toThrift()); resp.setFunctions(thriftFuncs); return resp; - } finally { - versionLock_.readLock().unlock(); } } case DATA_SOURCE: { diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index d2a1be40f..38e946fbd 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -967,7 +967,7 @@ public class CatalogOpExecutor { "Unable to process rename table event " + eventId, e); } finally { UnlockWriteLockIfErronouslyLocked(); - if (beforeTblLocked) tblBefore.releaseWriteLock(); + if (tblBefore != null && beforeTblLocked) { tblBefore.releaseWriteLock(); } } } finally { getMetastoreDdlLock().unlock(); @@ -2614,14 +2614,8 @@ public class CatalogOpExecutor { } /** - * Returns the latest notification event id from the Hive metastore. - */ - private long getCurrentEventId(MetaStoreClient msClient) throws ImpalaRuntimeException { - return getCurrentEventId(msClient, null); - } - - /** - * Same as the above but also update the given 'catalogTimeline'. + * Returns the latest notification event id from the Hive metastore and update the given + * 'catalogTimeline'. */ private long getCurrentEventId(MetaStoreClient msClient, EventSequence catalogTimeline) throws ImpalaRuntimeException { @@ -2681,6 +2675,7 @@ public class CatalogOpExecutor { // For persistent Java functions we extract all supported function signatures from // the corresponding Jar and add each signature to the catalog. Preconditions.checkState(fn instanceof ScalarFunction); + Preconditions.checkNotNull(hiveJavaFunction); List<ScalarFunction> funcs = hiveJavaFunction.extract(); if (addJavaFunctionToHms(hiveJavaFunction.getHiveFunction(), params.if_not_exists, catalogTimeline)) { @@ -3046,7 +3041,7 @@ public class CatalogOpExecutor { uncacheTable(removedDb.getTable(tableName), catalogTimeline); } removedObject = removedDb.toTCatalogObject(); - if (authzConfig_.isEnabled()) { + if (authzConfig_.isEnabled() && db != null) { authzManager_.updateDatabaseOwnerPrivilege(params.server_name, dbName, db.getMetaStoreDb().getOwnerName(), db.getMetaStoreDb().getOwnerType(), /* newOwner */ null, /* newOwnerType */ null, resp); @@ -4041,6 +4036,7 @@ public class CatalogOpExecutor { Table newTbl = catalog_ .addIncompleteTable(newTable.getDbName(), newTable.getTableName(), TImpalaTableType.TABLE, params.getComment(), createEventId); + Preconditions.checkNotNull(newTbl); catalogTimeline.markEvent(CREATED_CATALOG_TABLE); LOG.debug("Created a Kudu table {} with create event id {}", newTbl.getFullName(), createEventId); @@ -4337,6 +4333,7 @@ public class CatalogOpExecutor { Table newTbl = catalog_.addIncompleteTable(newTable.getDbName(), newTable.getTableName(), TImpalaTableType.TABLE, tblComment, createEventId); + Preconditions.checkNotNull(newTbl); catalogTimeline.markEvent(CREATED_CATALOG_TABLE); LOG.debug("Created an iceberg table {} in catalog with create event Id {} ", newTbl.getFullName(), createEventId); @@ -5616,6 +5613,7 @@ public class CatalogOpExecutor { } // If cache pool name is not null, it indicates this partition should be cached. if (cachePoolName != null) { + Preconditions.checkNotNull(replication); long id = HdfsCachingUtil.submitCachePartitionDirective(partition, cachePoolName, replication); cacheIds.add(id); @@ -6267,7 +6265,7 @@ public class CatalogOpExecutor { if (ifExists || keys.containsAll(removeProperties)) { keys.removeAll(removeProperties); } else { - List<String> removeCopy = new ArrayList(removeProperties); + List<String> removeCopy = new ArrayList<>(removeProperties); removeCopy.removeAll(keys); throw new CatalogException( String.format("These properties do not exist for %s: %s.\n%s", @@ -6526,6 +6524,7 @@ public class CatalogOpExecutor { catalogTimeline); // Handle HDFS cache. if (cachePoolName != null) { + Preconditions.checkNotNull(replication); int numDone = 0; for (List<Partition> hmsSublist : Lists.partition(addedPartitions, MAX_PARTITION_UPDATES_PER_RPC)) { @@ -7385,6 +7384,7 @@ public class CatalogOpExecutor { // tbl lock is held at this point. if (partSpecList != null) { + Preconditions.checkNotNull(partValsList); boolean partitionChanged = false; for (int i = 0; i < partSpecList.size(); ++i) { HdfsTable hdfsTbl = (HdfsTable) tbl; @@ -7545,7 +7545,7 @@ public class CatalogOpExecutor { } } else { // For non-partitioned table, only single part exists - FeFsPartition singlePart = Iterables.getOnlyElement((List<FeFsPartition>) parts); + FeFsPartition singlePart = Iterables.getOnlyElement(parts); affectedExistingPartitions.add(singlePart); } unsetTableColStats(table.getMetaStoreTable(), tblTxn, catalogTimeline); diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java index aae49245d..eb1b8c561 100644 --- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java +++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java @@ -179,12 +179,12 @@ public class JniCatalog { ExternalEventsProcessor eventsProcessor = getEventsProcessor(metaStoreClientPool, catalogOpExecutor_); catalog_.setMetastoreEventProcessor(eventsProcessor); - catalog_.startEventsProcessor(); catalogMetastoreServer_ = getCatalogMetastoreServer(catalogOpExecutor_); catalog_.setCatalogMetastoreServer(catalogMetastoreServer_); catalogMetastoreServer_.start(); // catalog-server.cc is responsible to call catalog_.reset() for the first time. + // The first reset also will call startEventsProcessor(). } /** diff --git a/tests/custom_cluster/test_catalogd_ha.py b/tests/custom_cluster/test_catalogd_ha.py index b149d2a44..ef91d3c56 100644 --- a/tests/custom_cluster/test_catalogd_ha.py +++ b/tests/custom_cluster/test_catalogd_ha.py @@ -551,7 +551,7 @@ class TestCatalogdHA(CustomClusterTestSuite): statestored_args="--use_subscriber_id_as_catalogd_priority=true", catalogd_args="--catalogd_ha_reset_metadata_on_failover=true " "--catalog_topic_mode=minimal " - "--debug_actions=TRIGGER_RESET_METADATA_DELAY:SLEEP@1000", + "--debug_actions=TRIGGER_RESET_METADATA_DELAY:SLEEP@3000", impalad_args="--use_local_catalog=true", start_args="--enable_catalogd_ha", disable_log_buffering=True)