IMPALA-5056: Ensure analysis uses 'fresh' catalog after metadata loading If a query was waiting for metadata to be loaded at the same time as the statestore restarted, it would not correctly see the metadata even after the statestore had recovered and sent it.
This is because the AnalysisContext used for analysis is created with a reference to a ImpaladCatalog once, before the analysis->metadata-load loop started. If the catalog sent a *complete* update (which it would do after a statestore restart), the ImpaladCatalog would be replaced, not updated, and so the tables would exist in the new object, but not the old one that the AnalysisContext had a reference to. The fix is to update the AnalysisContext's catalog every time through the loop, taking a recent reference to the ImpaladCatalog every time. That way, if the ImpaladCatalog is updated, the next time through the loop the analysis phase will see those changes. Also fix a potential bug where access to ImpaladCatalog is not synchronized between update and analysis threads, by using an AtomicReference<ImpaladCatalog>. Testing: manual - queries completed after statestore restarts with this patch, where previously they consumed 100% CPU spinning forever. Change-Id: I574d69fa75198499523dc291fbbd0d7e3d8d968f Reviewed-on: http://gerrit.cloudera.org:8080/7045 Reviewed-by: Henry Robinson <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/54118010 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/54118010 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/54118010 Branch: refs/heads/master Commit: 54118010590d8605303715bf570cac18e1d5e64e Parents: 70657a8 Author: Henry Robinson <[email protected]> Authored: Thu Jun 1 13:51:25 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Jun 8 03:40:56 2017 +0000 ---------------------------------------------------------------------- .../apache/impala/analysis/AnalysisContext.java | 12 +++-- .../org/apache/impala/service/Frontend.java | 54 ++++++++++---------- 2 files changed, 37 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54118010/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java index c6ee814..34ac107 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java +++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java @@ -52,7 +52,7 @@ import com.google.common.collect.Maps; */ public class AnalysisContext { private final static Logger LOG = LoggerFactory.getLogger(AnalysisContext.class); - private final ImpaladCatalog catalog_; + private ImpaladCatalog catalog_; private final TQueryCtx queryCtx_; private final AuthorizationConfig authzConfig_; private final ExprRewriter customRewriter_; @@ -66,7 +66,7 @@ public class AnalysisContext { public AnalysisContext(ImpaladCatalog catalog, TQueryCtx queryCtx, AuthorizationConfig authzConfig) { - catalog_ = catalog; + setCatalog(catalog); queryCtx_ = queryCtx; authzConfig_ = authzConfig; customRewriter_ = null; @@ -77,12 +77,18 @@ public class AnalysisContext { */ protected AnalysisContext(ImpaladCatalog catalog, TQueryCtx queryCtx, AuthorizationConfig authzConfig, ExprRewriter rewriter) { - catalog_ = catalog; + setCatalog(catalog); queryCtx_ = queryCtx; authzConfig_ = authzConfig; customRewriter_ = rewriter; } + // Catalog may change between analysis attempts (e.g. when missing tables are loaded). + public void setCatalog(ImpaladCatalog catalog) { + Preconditions.checkNotNull(catalog); + catalog_ = catalog; + } + static public class AnalysisResult { private StatementBase stmt_; private Analyzer analyzer_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54118010/fe/src/main/java/org/apache/impala/service/Frontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 0345260..3f4cc8a 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -160,7 +160,8 @@ public class Frontend { //TODO: Make the reload interval configurable. private static final int AUTHORIZATION_POLICY_RELOAD_INTERVAL_SECS = 5 * 60; - private ImpaladCatalog impaladCatalog_; + private AtomicReference<ImpaladCatalog> impaladCatalog_ = + new AtomicReference<ImpaladCatalog>(); private final AuthorizationConfig authzConfig_; private final AtomicReference<AuthorizationChecker> authzChecker_; private final ScheduledExecutorService policyReader_ = @@ -177,10 +178,10 @@ public class Frontend { */ public Frontend(AuthorizationConfig authorizationConfig, ImpaladCatalog catalog) { authzConfig_ = authorizationConfig; - impaladCatalog_ = catalog; + impaladCatalog_.set(catalog); defaultKuduMasterHosts_ = catalog.getDefaultKuduMasterHosts(); authzChecker_ = new AtomicReference<AuthorizationChecker>( - new AuthorizationChecker(authzConfig_, impaladCatalog_.getAuthPolicy())); + new AuthorizationChecker(authzConfig_, impaladCatalog_.get().getAuthPolicy())); // If authorization is enabled, reload the policy on a regular basis. if (authzConfig_.isEnabled() && authzConfig_.isFileBasedPolicy()) { // Stagger the reads across nodes @@ -215,27 +216,25 @@ public class Frontend { } } - public ImpaladCatalog getCatalog() { return impaladCatalog_; } + public ImpaladCatalog getCatalog() { return impaladCatalog_.get(); } public AuthorizationChecker getAuthzChecker() { return authzChecker_.get(); } public TUpdateCatalogCacheResponse updateCatalogCache( TUpdateCatalogCacheRequest req) throws CatalogException { - ImpaladCatalog catalog = impaladCatalog_; - - if (req.is_delta) return catalog.updateCatalog(req); + if (req.is_delta) return impaladCatalog_.get().updateCatalog(req); // If this is not a delta, this update should replace the current // Catalog contents so create a new catalog and populate it. - catalog = new ImpaladCatalog(defaultKuduMasterHosts_); + ImpaladCatalog catalog = new ImpaladCatalog(defaultKuduMasterHosts_); TUpdateCatalogCacheResponse response = catalog.updateCatalog(req); // Now that the catalog has been updated, replace the references to - // impaladCatalog_/authzChecker_. This ensures that clients don't see - // the catalog disappear. - impaladCatalog_ = catalog; - authzChecker_.set(new AuthorizationChecker(authzConfig_, - impaladCatalog_.getAuthPolicy())); + // impaladCatalog_/authzChecker_. This ensures that clients don't see the catalog + // disappear. The catalog is guaranteed to be ready since updateCatalog() has a + // postcondition of isReady() == true. + impaladCatalog_.set(catalog); + authzChecker_.set(new AuthorizationChecker(authzConfig_, catalog.getAuthPolicy())); return response; } @@ -547,11 +546,12 @@ public class Frontend { // Get the destination for the load. If the load is targeting a partition, // this the partition location. Otherwise this is the table location. String destPathString = null; + ImpaladCatalog catalog = impaladCatalog_.get(); if (request.isSetPartition_spec()) { - destPathString = impaladCatalog_.getHdfsPartition(tableName.getDb(), + destPathString = catalog.getHdfsPartition(tableName.getDb(), tableName.getTbl(), request.getPartition_spec()).getLocation(); } else { - destPathString = impaladCatalog_.getTable(tableName.getDb(), tableName.getTbl()) + destPathString = catalog.getTable(tableName.getDb(), tableName.getTbl()) .getMetaStoreTable().getSd().getLocation(); } @@ -607,7 +607,7 @@ public class Frontend { */ public List<String> getTableNames(String dbName, PatternMatcher matcher, User user) throws ImpalaException { - List<String> tblNames = impaladCatalog_.getTableNames(dbName, matcher); + List<String> tblNames = impaladCatalog_.get().getTableNames(dbName, matcher); if (authzConfig_.isEnabled()) { Iterator<String> iter = tblNames.iterator(); while (iter.hasNext()) { @@ -651,7 +651,7 @@ public class Frontend { */ public List<Db> getDbs(PatternMatcher matcher, User user) throws InternalException { - List<Db> dbs = impaladCatalog_.getDbs(matcher); + List<Db> dbs = impaladCatalog_.get().getDbs(matcher); // If authorization is enabled, filter out the databases the user does not // have permissions on. if (authzConfig_.isEnabled()) { @@ -683,7 +683,7 @@ public class Frontend { * matches all data sources. */ public List<DataSource> getDataSrcs(String pattern) { - return impaladCatalog_.getDataSources( + return impaladCatalog_.get().getDataSources( PatternMatcher.createHivePatternMatcher(pattern)); } @@ -692,7 +692,7 @@ public class Frontend { */ public TResultSet getColumnStats(String dbName, String tableName) throws ImpalaException { - Table table = impaladCatalog_.getTable(dbName, tableName); + Table table = impaladCatalog_.get().getTable(dbName, tableName); TResultSet result = new TResultSet(); TResultSetMetadata resultSchema = new TResultSetMetadata(); result.setSchema(resultSchema); @@ -720,7 +720,7 @@ public class Frontend { */ public TResultSet getTableStats(String dbName, String tableName, TShowStatsOp op) throws ImpalaException { - Table table = impaladCatalog_.getTable(dbName, tableName); + Table table = impaladCatalog_.get().getTable(dbName, tableName); if (table instanceof HdfsTable) { return ((HdfsTable) table).getTableStats(); } else if (table instanceof HBaseTable) { @@ -746,7 +746,7 @@ public class Frontend { public List<Function> getFunctions(TFunctionCategory category, String dbName, String fnPattern, boolean exactMatch) throws DatabaseNotFoundException { - Db db = impaladCatalog_.getDb(dbName); + Db db = impaladCatalog_.get().getDb(dbName); if (db == null) { throw new DatabaseNotFoundException("Database '" + dbName + "' not found"); } @@ -774,7 +774,7 @@ public class Frontend { */ public TDescribeResult describeDb(String dbName, TDescribeOutputStyle outputStyle) throws ImpalaException { - Db db = impaladCatalog_.getDb(dbName); + Db db = impaladCatalog_.get().getDb(dbName); return DescribeResultFactory.buildDescribeDbResult(db, outputStyle); } @@ -785,7 +785,7 @@ public class Frontend { */ public TDescribeResult describeTable(TTableName tableName, TDescribeOutputStyle outputStyle) throws ImpalaException { - Table table = impaladCatalog_.getTable(tableName.db_name, tableName.table_name); + Table table = impaladCatalog_.get().getTable(tableName.db_name, tableName.table_name); if (outputStyle == TDescribeOutputStyle.MINIMAL) { return DescribeResultFactory.buildDescribeMinimalResult(table); } else { @@ -882,12 +882,12 @@ public class Frontend { */ private AnalysisContext.AnalysisResult analyzeStmt(TQueryCtx queryCtx) throws AnalysisException, InternalException, AuthorizationException { - if (!impaladCatalog_.isReady()) { + if (!impaladCatalog_.get().isReady()) { throw new AnalysisException("This Impala daemon is not ready to accept user " + "requests. Status: Waiting for catalog update from the StateStore."); } - AnalysisContext analysisCtx = new AnalysisContext(impaladCatalog_, queryCtx, + AnalysisContext analysisCtx = new AnalysisContext(impaladCatalog_.get(), queryCtx, authzConfig_); LOG.info("Compiling query: " + queryCtx.client_request.stmt); @@ -897,6 +897,8 @@ public class Frontend { // 3) Analysis fails with an AuthorizationException. try { while (true) { + // Ensure that catalog snapshot reflects any recent changes. + analysisCtx.setCatalog(impaladCatalog_.get()); try { analysisCtx.analyze(queryCtx.client_request.stmt); Preconditions.checkState(analysisCtx.getAnalyzer().getMissingTbls().isEmpty()); @@ -1214,7 +1216,7 @@ public class Frontend { */ public TResultSet getTableFiles(TShowFilesParams request) throws ImpalaException{ - Table table = impaladCatalog_.getTable(request.getTable_name().getDb_name(), + Table table = impaladCatalog_.get().getTable(request.getTable_name().getDb_name(), request.getTable_name().getTable_name()); if (table instanceof HdfsTable) { return ((HdfsTable) table).getFiles(request.getPartition_set());
