IMPALA-5056: Ensure analysis uses 'fresh' catalog after metadata loading

If a query was waiting for metadata to be loaded at the same time as the
statestore restarted, it would not correctly see the metadata even after
the statestore had recovered and sent it.

This is because the AnalysisContext used for analysis is created with a
reference to a ImpaladCatalog once, before the analysis->metadata-load
loop started. If the catalog sent a *complete* update (which it would do
after a statestore restart), the ImpaladCatalog would be replaced, not
updated, and so the tables would exist in the new object, but not the
old one that the AnalysisContext had a reference to.

The fix is to update the AnalysisContext's catalog every time through
the loop, taking a recent reference to the ImpaladCatalog every
time. That way, if the ImpaladCatalog is updated, the next time through
the loop the analysis phase will see those changes.

Also fix a potential bug where access to ImpaladCatalog is not
synchronized between update and analysis threads, by using an
AtomicReference<ImpaladCatalog>.

Testing: manual - queries completed after statestore restarts with this
patch, where previously they consumed 100% CPU spinning forever.

Change-Id: I574d69fa75198499523dc291fbbd0d7e3d8d968f
Reviewed-on: http://gerrit.cloudera.org:8080/7045
Reviewed-by: Henry Robinson <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/54118010
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/54118010
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/54118010

Branch: refs/heads/master
Commit: 54118010590d8605303715bf570cac18e1d5e64e
Parents: 70657a8
Author: Henry Robinson <[email protected]>
Authored: Thu Jun 1 13:51:25 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Jun 8 03:40:56 2017 +0000

----------------------------------------------------------------------
 .../apache/impala/analysis/AnalysisContext.java | 12 +++--
 .../org/apache/impala/service/Frontend.java     | 54 ++++++++++----------
 2 files changed, 37 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54118010/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java 
b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index c6ee814..34ac107 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -52,7 +52,7 @@ import com.google.common.collect.Maps;
  */
 public class AnalysisContext {
   private final static Logger LOG = 
LoggerFactory.getLogger(AnalysisContext.class);
-  private final ImpaladCatalog catalog_;
+  private ImpaladCatalog catalog_;
   private final TQueryCtx queryCtx_;
   private final AuthorizationConfig authzConfig_;
   private final ExprRewriter customRewriter_;
@@ -66,7 +66,7 @@ public class AnalysisContext {
 
   public AnalysisContext(ImpaladCatalog catalog, TQueryCtx queryCtx,
       AuthorizationConfig authzConfig) {
-    catalog_ = catalog;
+    setCatalog(catalog);
     queryCtx_ = queryCtx;
     authzConfig_ = authzConfig;
     customRewriter_ = null;
@@ -77,12 +77,18 @@ public class AnalysisContext {
    */
   protected AnalysisContext(ImpaladCatalog catalog, TQueryCtx queryCtx,
       AuthorizationConfig authzConfig, ExprRewriter rewriter) {
-    catalog_ = catalog;
+    setCatalog(catalog);
     queryCtx_ = queryCtx;
     authzConfig_ = authzConfig;
     customRewriter_ = rewriter;
   }
 
+  // Catalog may change between analysis attempts (e.g. when missing tables 
are loaded).
+  public void setCatalog(ImpaladCatalog catalog) {
+    Preconditions.checkNotNull(catalog);
+    catalog_ = catalog;
+  }
+
   static public class AnalysisResult {
     private StatementBase stmt_;
     private Analyzer analyzer_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54118010/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 0345260..3f4cc8a 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -160,7 +160,8 @@ public class Frontend {
   //TODO: Make the reload interval configurable.
   private static final int AUTHORIZATION_POLICY_RELOAD_INTERVAL_SECS = 5 * 60;
 
-  private ImpaladCatalog impaladCatalog_;
+  private AtomicReference<ImpaladCatalog> impaladCatalog_ =
+      new AtomicReference<ImpaladCatalog>();
   private final AuthorizationConfig authzConfig_;
   private final AtomicReference<AuthorizationChecker> authzChecker_;
   private final ScheduledExecutorService policyReader_ =
@@ -177,10 +178,10 @@ public class Frontend {
    */
   public Frontend(AuthorizationConfig authorizationConfig, ImpaladCatalog 
catalog) {
     authzConfig_ = authorizationConfig;
-    impaladCatalog_ = catalog;
+    impaladCatalog_.set(catalog);
     defaultKuduMasterHosts_ = catalog.getDefaultKuduMasterHosts();
     authzChecker_ = new AtomicReference<AuthorizationChecker>(
-        new AuthorizationChecker(authzConfig_, 
impaladCatalog_.getAuthPolicy()));
+        new AuthorizationChecker(authzConfig_, 
impaladCatalog_.get().getAuthPolicy()));
     // If authorization is enabled, reload the policy on a regular basis.
     if (authzConfig_.isEnabled() && authzConfig_.isFileBasedPolicy()) {
       // Stagger the reads across nodes
@@ -215,27 +216,25 @@ public class Frontend {
     }
   }
 
-  public ImpaladCatalog getCatalog() { return impaladCatalog_; }
+  public ImpaladCatalog getCatalog() { return impaladCatalog_.get(); }
   public AuthorizationChecker getAuthzChecker() { return authzChecker_.get(); }
 
   public TUpdateCatalogCacheResponse updateCatalogCache(
       TUpdateCatalogCacheRequest req) throws CatalogException {
-    ImpaladCatalog catalog = impaladCatalog_;
-
-    if (req.is_delta) return catalog.updateCatalog(req);
+    if (req.is_delta) return impaladCatalog_.get().updateCatalog(req);
 
     // If this is not a delta, this update should replace the current
     // Catalog contents so create a new catalog and populate it.
-    catalog = new ImpaladCatalog(defaultKuduMasterHosts_);
+    ImpaladCatalog catalog = new ImpaladCatalog(defaultKuduMasterHosts_);
 
     TUpdateCatalogCacheResponse response = catalog.updateCatalog(req);
 
     // Now that the catalog has been updated, replace the references to
-    // impaladCatalog_/authzChecker_. This ensures that clients don't see
-    // the catalog disappear.
-    impaladCatalog_ = catalog;
-    authzChecker_.set(new AuthorizationChecker(authzConfig_,
-        impaladCatalog_.getAuthPolicy()));
+    // impaladCatalog_/authzChecker_. This ensures that clients don't see the 
catalog
+    // disappear. The catalog is guaranteed to be ready since updateCatalog() 
has a
+    // postcondition of isReady() == true.
+    impaladCatalog_.set(catalog);
+    authzChecker_.set(new AuthorizationChecker(authzConfig_, 
catalog.getAuthPolicy()));
     return response;
   }
 
@@ -547,11 +546,12 @@ public class Frontend {
     // Get the destination for the load. If the load is targeting a partition,
     // this the partition location. Otherwise this is the table location.
     String destPathString = null;
+    ImpaladCatalog catalog = impaladCatalog_.get();
     if (request.isSetPartition_spec()) {
-      destPathString = impaladCatalog_.getHdfsPartition(tableName.getDb(),
+      destPathString = catalog.getHdfsPartition(tableName.getDb(),
           tableName.getTbl(), request.getPartition_spec()).getLocation();
     } else {
-      destPathString = impaladCatalog_.getTable(tableName.getDb(), 
tableName.getTbl())
+      destPathString = catalog.getTable(tableName.getDb(), tableName.getTbl())
           .getMetaStoreTable().getSd().getLocation();
     }
 
@@ -607,7 +607,7 @@ public class Frontend {
    */
   public List<String> getTableNames(String dbName, PatternMatcher matcher,
       User user) throws ImpalaException {
-    List<String> tblNames = impaladCatalog_.getTableNames(dbName, matcher);
+    List<String> tblNames = impaladCatalog_.get().getTableNames(dbName, 
matcher);
     if (authzConfig_.isEnabled()) {
       Iterator<String> iter = tblNames.iterator();
       while (iter.hasNext()) {
@@ -651,7 +651,7 @@ public class Frontend {
    */
   public List<Db> getDbs(PatternMatcher matcher, User user)
       throws InternalException {
-    List<Db> dbs = impaladCatalog_.getDbs(matcher);
+    List<Db> dbs = impaladCatalog_.get().getDbs(matcher);
     // If authorization is enabled, filter out the databases the user does not
     // have permissions on.
     if (authzConfig_.isEnabled()) {
@@ -683,7 +683,7 @@ public class Frontend {
    * matches all data sources.
    */
   public List<DataSource> getDataSrcs(String pattern) {
-    return impaladCatalog_.getDataSources(
+    return impaladCatalog_.get().getDataSources(
         PatternMatcher.createHivePatternMatcher(pattern));
   }
 
@@ -692,7 +692,7 @@ public class Frontend {
    */
   public TResultSet getColumnStats(String dbName, String tableName)
       throws ImpalaException {
-    Table table = impaladCatalog_.getTable(dbName, tableName);
+    Table table = impaladCatalog_.get().getTable(dbName, tableName);
     TResultSet result = new TResultSet();
     TResultSetMetadata resultSchema = new TResultSetMetadata();
     result.setSchema(resultSchema);
@@ -720,7 +720,7 @@ public class Frontend {
    */
   public TResultSet getTableStats(String dbName, String tableName, 
TShowStatsOp op)
       throws ImpalaException {
-    Table table = impaladCatalog_.getTable(dbName, tableName);
+    Table table = impaladCatalog_.get().getTable(dbName, tableName);
     if (table instanceof HdfsTable) {
       return ((HdfsTable) table).getTableStats();
     } else if (table instanceof HBaseTable) {
@@ -746,7 +746,7 @@ public class Frontend {
   public List<Function> getFunctions(TFunctionCategory category,
       String dbName, String fnPattern, boolean exactMatch)
       throws DatabaseNotFoundException {
-    Db db = impaladCatalog_.getDb(dbName);
+    Db db = impaladCatalog_.get().getDb(dbName);
     if (db == null) {
       throw new DatabaseNotFoundException("Database '" + dbName + "' not 
found");
     }
@@ -774,7 +774,7 @@ public class Frontend {
    */
   public TDescribeResult describeDb(String dbName, TDescribeOutputStyle 
outputStyle)
       throws ImpalaException {
-    Db db = impaladCatalog_.getDb(dbName);
+    Db db = impaladCatalog_.get().getDb(dbName);
     return DescribeResultFactory.buildDescribeDbResult(db, outputStyle);
   }
 
@@ -785,7 +785,7 @@ public class Frontend {
    */
   public TDescribeResult describeTable(TTableName tableName,
       TDescribeOutputStyle outputStyle) throws ImpalaException {
-    Table table = impaladCatalog_.getTable(tableName.db_name, 
tableName.table_name);
+    Table table = impaladCatalog_.get().getTable(tableName.db_name, 
tableName.table_name);
     if (outputStyle == TDescribeOutputStyle.MINIMAL) {
       return DescribeResultFactory.buildDescribeMinimalResult(table);
     } else {
@@ -882,12 +882,12 @@ public class Frontend {
    */
   private AnalysisContext.AnalysisResult analyzeStmt(TQueryCtx queryCtx)
       throws AnalysisException, InternalException, AuthorizationException {
-    if (!impaladCatalog_.isReady()) {
+    if (!impaladCatalog_.get().isReady()) {
       throw new AnalysisException("This Impala daemon is not ready to accept 
user " +
           "requests. Status: Waiting for catalog update from the StateStore.");
     }
 
-    AnalysisContext analysisCtx = new AnalysisContext(impaladCatalog_, 
queryCtx,
+    AnalysisContext analysisCtx = new AnalysisContext(impaladCatalog_.get(), 
queryCtx,
         authzConfig_);
     LOG.info("Compiling query: " + queryCtx.client_request.stmt);
 
@@ -897,6 +897,8 @@ public class Frontend {
     // 3) Analysis fails with an AuthorizationException.
     try {
       while (true) {
+        // Ensure that catalog snapshot reflects any recent changes.
+        analysisCtx.setCatalog(impaladCatalog_.get());
         try {
           analysisCtx.analyze(queryCtx.client_request.stmt);
           
Preconditions.checkState(analysisCtx.getAnalyzer().getMissingTbls().isEmpty());
@@ -1214,7 +1216,7 @@ public class Frontend {
    */
   public TResultSet getTableFiles(TShowFilesParams request)
       throws ImpalaException{
-    Table table = 
impaladCatalog_.getTable(request.getTable_name().getDb_name(),
+    Table table = 
impaladCatalog_.get().getTable(request.getTable_name().getDb_name(),
         request.getTable_name().getTable_name());
     if (table instanceof HdfsTable) {
       return ((HdfsTable) table).getFiles(request.getPartition_set());

Reply via email to