Repository: incubator-impala Updated Branches: refs/heads/master 24869d40f -> a60ba6d27
IMPALA-4020: Handle external conflicting changes to HMS gracefully Currently Catalog can't handle conflicting changes to HMS' databases from external clients while running invalidate metadata operation. For example, if a database is dropped by a client external to Impala, while the invalidate metadata is in process, Catalog aborts the metadata load. This commit fixes this issue by handling appropriate exceptions when HMS operations fail and only ignores the load for that particular database. Change-Id: Ic228efbcceb9ef6c165d0d9aeef7202581e3e46a Reviewed-on: http://gerrit.cloudera.org:8080/4161 Reviewed-by: Dimitris Tsirogiannis <[email protected]> Reviewed-by: Marcel Kornacker <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5f3ee38b Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5f3ee38b Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5f3ee38b Branch: refs/heads/master Commit: 5f3ee38bf4106fe4beb24e4f5237047decead3fe Parents: 24869d4 Author: Bharath Vissapragada <[email protected]> Authored: Mon Aug 29 12:13:37 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Thu Sep 1 04:15:04 2016 +0000 ---------------------------------------------------------------------- .../impala/catalog/CatalogServiceCatalog.java | 92 +++++++++++++------- 1 file changed, 61 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5f3ee38b/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java index f0993bb..e8f5577 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java @@ -536,6 +536,60 @@ public class CatalogServiceCatalog extends Catalog { } } } + + /** + * Invalidates the database 'db'. This method can have potential race + * conditions with external changes to the Hive metastore and hence any + * conflicting changes to the objects can manifest in the form of exceptions + * from the HMS calls which are appropriately handled. Returns the invalidated + * 'Db' object along with list of tables to be loaded by the TableLoadingMgr. + * Returns null if the method encounters an exception during invalidation. + */ + private Pair<Db, List<TTableName>> invalidateDb( + MetaStoreClient msClient, String dbName, Db existingDb) { + try { + List<org.apache.hadoop.hive.metastore.api.Function> javaFns = + Lists.newArrayList(); + for (String javaFn: msClient.getHiveClient().getFunctions(dbName, "*")) { + javaFns.add(msClient.getHiveClient().getFunction(dbName, javaFn)); + } + org.apache.hadoop.hive.metastore.api.Database msDb = + msClient.getHiveClient().getDatabase(dbName); + Db newDb = new Db(dbName, this, msDb); + // existingDb is usually null when the Catalog loads for the first time. + // In that case we needn't restore any transient functions. + if (existingDb != null) { + // Restore UDFs that aren't persisted. They are only cleaned up on + // Catalog restart. + for (Function fn: existingDb.getTransientFunctions()) { + newDb.addFunction(fn); + fn.setCatalogVersion(incrementAndGetCatalogVersion()); + } + } + // Reload native UDFs. + loadFunctionsFromDbParams(newDb, msDb); + // Reload Java UDFs from HMS. + loadJavaFunctions(newDb, javaFns); + newDb.setCatalogVersion(incrementAndGetCatalogVersion()); + + List<TTableName> tblsToBackgroundLoad = Lists.newArrayList(); + for (String tableName: msClient.getHiveClient().getAllTables(dbName)) { + Table incompleteTbl = IncompleteTable.createUninitializedTable( + getNextTableId(), newDb, tableName); + incompleteTbl.setCatalogVersion(incrementAndGetCatalogVersion()); + newDb.addTable(incompleteTbl); + if (loadInBackground_) { + tblsToBackgroundLoad.add(new TTableName(dbName, tableName.toLowerCase())); + } + } + return Pair.create(newDb, tblsToBackgroundLoad); + } catch (Exception e) { + LOG.warn("Encountered an exception while invalidating database: " + dbName + + ". Ignoring further load of this db.", e); + } + return null; + } + /** * Resets this catalog instance by clearing all cached table and database metadata. */ @@ -565,37 +619,13 @@ public class CatalogServiceCatalog extends Catalog { List<TTableName> tblsToBackgroundLoad = Lists.newArrayList(); try (MetaStoreClient msClient = getMetaStoreClient()) { for (String dbName: msClient.getHiveClient().getAllDatabases()) { - List<org.apache.hadoop.hive.metastore.api.Function> javaFns = - Lists.newArrayList(); - for (String javaFn: msClient.getHiveClient().getFunctions(dbName, "*")) { - javaFns.add(msClient.getHiveClient().getFunction(dbName, javaFn)); - } - org.apache.hadoop.hive.metastore.api.Database msDb = - msClient.getHiveClient().getDatabase(dbName); - Db db = new Db(dbName, this, msDb); - // Restore UDFs that aren't persisted. - Db oldDb = oldDbCache.get(db.getName().toLowerCase()); - if (oldDb != null) { - for (Function fn: oldDb.getTransientFunctions()) { - db.addFunction(fn); - fn.setCatalogVersion(incrementAndGetCatalogVersion()); - } - } - loadFunctionsFromDbParams(db, msDb); - loadJavaFunctions(db, javaFns); - db.setCatalogVersion(incrementAndGetCatalogVersion()); - newDbCache.put(db.getName().toLowerCase(), db); - - for (String tableName: msClient.getHiveClient().getAllTables(dbName)) { - Table incompleteTbl = IncompleteTable.createUninitializedTable( - getNextTableId(), db, tableName); - incompleteTbl.setCatalogVersion(incrementAndGetCatalogVersion()); - db.addTable(incompleteTbl); - if (loadInBackground_) { - tblsToBackgroundLoad.add( - new TTableName(dbName.toLowerCase(), tableName.toLowerCase())); - } - } + dbName = dbName.toLowerCase(); + Db oldDb = oldDbCache.get(dbName); + Pair<Db, List<TTableName>> invalidatedDb = invalidateDb(msClient, + dbName, oldDb); + if (invalidatedDb == null) continue; + newDbCache.put(dbName, invalidatedDb.first); + tblsToBackgroundLoad.addAll(invalidatedDb.second); } } dbCache_.set(newDbCache);
