IMPALA-4278: Don't abort Catalog startup quickly if HMS is not present

This change introduces a new catalogd startup option
(init_first_metastore_client_timeout_seconds) that specifies the
time in seconds catalogd should spend on retrying to establish a
connection to HMS the first time on startup before giving up and
exiting fatally.

Setting this startup option to a value that is greater than the HMS
startup time will allow CM to start Impala at the same time or even
before HMS.

The default value of init_first_metastore_client_timeout_seconds is
120 seconds.

Change-Id: I546d8fe9836004832ae40110c9fe22b3e704e11b
Reviewed-on: http://gerrit.cloudera.org:8080/5095
Reviewed-by: Henry Robinson <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/60414f06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/60414f06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/60414f06

Branch: refs/heads/master
Commit: 60414f06330aa62d06a67821309c8ff452de5a5f
Parents: 76719de
Author: Attila Jeges <[email protected]>
Authored: Tue Oct 25 21:59:35 2016 +0200
Committer: Internal Jenkins <[email protected]>
Committed: Fri Nov 18 03:12:12 2016 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog.cc                       |   3 +
 be/src/util/backend-gflag-util.cc               |   2 +
 common/thrift/BackendGflags.thrift              |   2 +
 .../java/org/apache/impala/catalog/Catalog.java |  36 ++--
 .../impala/catalog/CatalogServiceCatalog.java   |  14 +-
 .../apache/impala/catalog/ImpaladCatalog.java   |   2 +-
 .../impala/catalog/MetaStoreClientPool.java     |  65 +++++--
 .../org/apache/impala/service/JniCatalog.java   |  13 +-
 .../testutil/CatalogServiceTestCatalog.java     |   9 +-
 tests/experiments/test_catalog_hms_failures.py  | 188 +++++++++++++++++++
 10 files changed, 285 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/be/src/catalog/catalog.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index b3690f6..3ed8d7f 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -35,6 +35,9 @@ DEFINE_bool(load_catalog_in_background, false,
 DEFINE_int32(num_metadata_loading_threads, 16,
     "(Advanced) The number of metadata loading threads (degree of parallelism) 
to use "
     "when loading catalog metadata.");
+DEFINE_int32(initial_hms_cnxn_timeout_s, 120,
+    "Number of seconds catalogd will wait to establish an initial connection 
to the HMS "
+    "before exiting.");
 DEFINE_string(sentry_config, "", "Local path to a sentry-site.xml 
configuration "
     "file. If set, authorization will be enabled.");
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index f504e57..da62b61 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -29,6 +29,7 @@ DECLARE_bool(load_auth_to_local_rules);
 DECLARE_int32(non_impala_java_vlog);
 DECLARE_int32(read_size);
 DECLARE_int32(num_metadata_loading_threads);
+DECLARE_int32(initial_hms_cnxn_timeout_s);
 DECLARE_int32(kudu_operation_timeout_ms);
 DECLARE_int64(inc_stats_size_limit_bytes);
 DECLARE_string(principal);
@@ -57,6 +58,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* 
cfg_bytes) {
   cfg.__set_kudu_master_hosts(FLAGS_kudu_master_hosts);
   cfg.__set_read_size(FLAGS_read_size);
   cfg.__set_num_metadata_loading_threads(FLAGS_num_metadata_loading_threads);
+  cfg.__set_initial_hms_cnxn_timeout_s(FLAGS_initial_hms_cnxn_timeout_s);
   cfg.__set_sentry_config(FLAGS_sentry_config);
   // auth_to_local rules are read if --load_auth_to_local_rules is set to true
   // and impala is kerberized.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index ad081a6..09cf6f7 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -52,4 +52,6 @@ struct TBackendGflags {
   15: required i32 read_size
 
   16: required i32 kudu_operation_timeout_ms
+
+  17: required i32 initial_hms_cnxn_timeout_s
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/fe/src/main/java/org/apache/impala/catalog/Catalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java 
b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 733b2f2..403b9c1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -17,14 +17,16 @@
 
 package org.apache.impala.catalog;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.log4j.Logger;
-
 import org.apache.impala.analysis.FunctionName;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.thrift.TCatalogObject;
@@ -32,9 +34,7 @@ import org.apache.impala.thrift.TFunction;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.PatternMatcher;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import org.apache.log4j.Logger;
 
 /**
  * Thread safe interface for reading and updating metadata stored in the Hive 
MetaStore.
@@ -60,11 +60,10 @@ public abstract class Catalog {
   // Initial catalog version.
   public final static long INITIAL_CATALOG_VERSION = 0L;
   public static final String DEFAULT_DB = "default";
-  private static final int META_STORE_CLIENT_POOL_SIZE = 10;
-
   public static final String BUILTINS_DB = "_impala_builtins";
 
-  protected final MetaStoreClientPool metaStoreClientPool_ = new 
MetaStoreClientPool(0);
+  protected final MetaStoreClientPool metaStoreClientPool_ =
+      new MetaStoreClientPool(0, 0);
 
   // Cache of authorization policy metadata. Populated from data retried from 
the
   // Sentry Service, if configured.
@@ -88,19 +87,24 @@ public abstract class Catalog {
   protected final CatalogObjectCache<HdfsCachePool> hdfsCachePools_ =
       new CatalogObjectCache<HdfsCachePool>(false);
 
-  /**
-   * Creates a new instance of a Catalog. If initMetastoreClientPool is true, 
will
-   * also add META_STORE_CLIENT_POOL_SIZE clients to metastoreClientPool_.
-   */
-  public Catalog(boolean initMetastoreClientPool) {
-    if (initMetastoreClientPool) {
-      metaStoreClientPool_.addClients(META_STORE_CLIENT_POOL_SIZE);
-    }
+  public Catalog() {
     dataSources_ = new CatalogObjectCache<DataSource>();
     builtinsDb_ = new BuiltinsDb(BUILTINS_DB, this);
     addDb(builtinsDb_);
   }
 
+  /**
+   * Creates a new instance of Catalog. It also adds 'numClients' clients to
+   * 'metastoreClientPool_'.
+   * 'initialCnxnTimeoutSec' specifies the time (in seconds) Catalog will wait 
to
+   * establish an initial connection to the HMS. Using this setting allows 
catalogd and
+   * HMS to be started simultaneously.
+   */
+  public Catalog(int numClients, int initialCnxnTimeoutSec) {
+    this();
+    metaStoreClientPool_.initClients(numClients, initialCnxnTimeoutSec);
+  }
+
   public Db getBuiltinsDb() { return builtinsDb_; }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 203ef6d..7997412 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -107,6 +107,7 @@ import com.google.common.collect.Sets;
 public class CatalogServiceCatalog extends Catalog {
   private static final Logger LOG = 
Logger.getLogger(CatalogServiceCatalog.class);
 
+  private static final int INITIAL_META_STORE_CLIENT_POOL_SIZE = 10;
   private final TUniqueId catalogServiceId_;
 
   // Fair lock used to synchronize reads/writes of catalogVersion_. Because 
this lock
@@ -148,13 +149,16 @@ public class CatalogServiceCatalog extends Catalog {
   private static String localLibraryPath_;
 
   /**
-   * Initialize the CatalogServiceCatalog. If loadInBackground is true, table 
metadata
-   * will be loaded in the background
+   * Initialize the CatalogServiceCatalog. If 'loadInBackground' is true, 
table metadata
+   * will be loaded in the background. 'initialHmsCnxnTimeoutSec' specifies 
the time (in
+   * seconds) CatalogServiceCatalog will wait to establish an initial 
connection to the
+   * HMS before giving up. Using this setting allows catalogd and HMS to be 
started
+   * simultaneously.
    */
   public CatalogServiceCatalog(boolean loadInBackground, int numLoadingThreads,
-      SentryConfig sentryConfig, TUniqueId catalogServiceId, String 
kerberosPrincipal,
-      String localLibraryPath) {
-    super(true);
+      int initialHmsCnxnTimeoutSec, SentryConfig sentryConfig, TUniqueId 
catalogServiceId,
+      String kerberosPrincipal, String localLibraryPath) {
+    super(INITIAL_META_STORE_CLIENT_POOL_SIZE, initialHmsCnxnTimeoutSec);
     catalogServiceId_ = catalogServiceId;
     tableLoadingMgr_ = new TableLoadingMgr(this, numLoadingThreads);
     loadInBackground_ = loadInBackground;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 3647256..a59f997 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -96,7 +96,7 @@ public class ImpaladCatalog extends Catalog {
    * CatalogServer.
    */
   public ImpaladCatalog(String defaultKuduMasterHosts) {
-    super(false);
+    super();
     defaultKuduMasterHosts_ = defaultKuduMasterHosts;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/fe/src/main/java/org/apache/impala/catalog/MetaStoreClientPool.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/MetaStoreClientPool.java 
b/fe/src/main/java/org/apache/impala/catalog/MetaStoreClientPool.java
index 372fe37..29e5df9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/MetaStoreClientPool.java
+++ b/fe/src/main/java/org/apache/impala/catalog/MetaStoreClientPool.java
@@ -18,6 +18,7 @@
 package org.apache.impala.catalog;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
@@ -72,15 +73,38 @@ public class MetaStoreClientPool {
     private final IMetaStoreClient hiveClient_;
     private boolean isInUse_;
 
-    private MetaStoreClient(HiveConf hiveConf) {
-      try {
-        LOG.debug("Creating MetaStoreClient. Pool Size = " + 
clientPool_.size());
-        hiveClient_ = RetryingMetaStoreClient.getProxy(hiveConf, 
dummyHookLoader,
-            HiveMetaStoreClient.class.getName());
-      } catch (Exception e) {
-        // Turn in to an unchecked exception
-        throw new IllegalStateException(e);
+    /**
+     * Creates a new instance of MetaStoreClient.
+     * 'cnxnTimeoutSec' specifies the time MetaStoreClient will wait to 
establish first
+     * connection to the HMS before giving up and failing out with an 
exception.
+     */
+    private MetaStoreClient(HiveConf hiveConf, int cnxnTimeoutSec) {
+      LOG.debug("Creating MetaStoreClient. Pool Size = " + clientPool_.size());
+
+      long retryDelaySeconds = hiveConf.getTimeVar(
+          HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, 
TimeUnit.SECONDS);
+      long retryDelayMillis = retryDelaySeconds * 1000;
+      long endTimeMillis = System.currentTimeMillis() + cnxnTimeoutSec * 1000;
+      IMetaStoreClient hiveClient = null;
+      while (true) {
+        try {
+          hiveClient = RetryingMetaStoreClient.getProxy(hiveConf, 
dummyHookLoader,
+              HiveMetaStoreClient.class.getName());
+          break;
+        } catch (Exception e) {
+          // If time is up, throw an unchecked exception
+          long delayUntilMillis = System.currentTimeMillis() + 
retryDelayMillis;
+          if (delayUntilMillis >= endTimeMillis) throw new 
IllegalStateException(e);
+
+          LOG.warn("Failed to connect to Hive MetaStore. Retrying.", e);
+          while (delayUntilMillis > System.currentTimeMillis()) {
+            try {
+              Thread.sleep(delayUntilMillis - System.currentTimeMillis());
+            } catch (InterruptedException | IllegalArgumentException ignore) {}
+          }
+        }
       }
+      hiveClient_ = hiveClient;
       isInUse_ = false;
     }
 
@@ -119,23 +143,30 @@ public class MetaStoreClientPool {
     }
   }
 
-  public MetaStoreClientPool(int initialSize) {
-    this(initialSize, new HiveConf(MetaStoreClientPool.class));
+  public MetaStoreClientPool(int initialSize, int initialCnxnTimeoutSec) {
+    this(initialSize, initialCnxnTimeoutSec, new 
HiveConf(MetaStoreClientPool.class));
   }
 
-  public MetaStoreClientPool(int initialSize, HiveConf hiveConf) {
+  public MetaStoreClientPool(int initialSize, int initialCnxnTimeoutSec,
+      HiveConf hiveConf) {
     hiveConf_ = hiveConf;
     clientCreationDelayMs_ = 
hiveConf_.getInt(HIVE_METASTORE_CNXN_DELAY_MS_CONF,
         DEFAULT_HIVE_METASTORE_CNXN_DELAY_MS_CONF);
-    addClients(initialSize);
+    initClients(initialSize, initialCnxnTimeoutSec);
   }
 
   /**
-   * Add numClients to the client pool.
+   * Initialize client pool with 'numClients' client.
+   * 'initialCnxnTimeoutSec' specifies the time (in seconds) the first client 
will wait to
+   * establish an initial connection to the HMS.
    */
-  public void addClients(int numClients) {
-    for (int i = 0; i < numClients; ++i) {
-      clientPool_.add(new MetaStoreClient(hiveConf_));
+  public void initClients(int numClients, int initialCnxnTimeoutSec) {
+    Preconditions.checkState(clientPool_.size() == 0);
+    if (numClients > 0) {
+      clientPool_.add(new MetaStoreClient(hiveConf_, initialCnxnTimeoutSec));
+      for (int i = 0; i < numClients - 1; ++i) {
+        clientPool_.add(new MetaStoreClient(hiveConf_, 0));
+      }
     }
   }
 
@@ -163,7 +194,7 @@ public class MetaStoreClientPool {
         } catch (InterruptedException e) {
           /* ignore */
         }
-        client = new MetaStoreClient(hiveConf_);
+        client = new MetaStoreClient(hiveConf_, 0);
       }
     }
     client.markInUse();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/fe/src/main/java/org/apache/impala/service/JniCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java 
b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index db9ba63..b756230 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -17,6 +17,10 @@
 
 package org.apache.impala.service;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -56,10 +60,6 @@ import org.apache.thrift.protocol.TBinaryProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
 /**
  * JNI-callable interface for the CatalogService. The main point is to 
serialize
  * and de-serialize thrift structures between C and Java parts of the 
CatalogService.
@@ -87,6 +87,7 @@ public class JniCatalog {
     BackendConfig.create(cfg);
 
     Preconditions.checkArgument(cfg.num_metadata_loading_threads > 0);
+    Preconditions.checkArgument(cfg.initial_hms_cnxn_timeout_s > 0);
     // This trick saves having to pass a TLogLevel enum, which is an object 
and more
     // complex to pass through JNI.
     GlogAppender.Install(TLogLevel.values()[cfg.impala_log_lvl],
@@ -101,8 +102,8 @@ public class JniCatalog {
     LOG.info(JniUtil.getJavaVersion());
 
     catalog_ = new CatalogServiceCatalog(cfg.load_catalog_in_background,
-        cfg.num_metadata_loading_threads, sentryConfig, getServiceId(), 
cfg.principal,
-        cfg.local_library_path);
+        cfg.num_metadata_loading_threads, cfg.initial_hms_cnxn_timeout_s, 
sentryConfig,
+        getServiceId(), cfg.principal, cfg.local_library_path);
     try {
       catalog_.reset();
     } catch (CatalogException e) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
----------------------------------------------------------------------
diff --git 
a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java 
b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
index c364ba5..93e4af0 100644
--- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
@@ -30,9 +30,10 @@ import org.apache.impala.thrift.TUniqueId;
 public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
 
   public CatalogServiceTestCatalog(boolean loadInBackground, int 
numLoadingThreads,
-      SentryConfig sentryConfig, TUniqueId catalogServiceId) {
-    super(loadInBackground, numLoadingThreads, sentryConfig, catalogServiceId, 
null,
-        System.getProperty("java.io.tmpdir"));
+      int initialHmsCnxnTimeoutSec, SentryConfig sentryConfig,
+      TUniqueId catalogServiceId) {
+    super(loadInBackground, numLoadingThreads, initialHmsCnxnTimeoutSec, 
sentryConfig,
+        catalogServiceId, null, System.getProperty("java.io.tmpdir"));
 
     // Cache pools are typically loaded asynchronously, but as there is no 
fixed execution
     // order for tests, the cache pools are loaded synchronously before the 
tests are
@@ -51,7 +52,7 @@ public class CatalogServiceTestCatalog extends 
CatalogServiceCatalog {
    */
   public static CatalogServiceCatalog createWithAuth(SentryConfig config) {
     CatalogServiceCatalog cs =
-        new CatalogServiceTestCatalog(false, 16, config, new TUniqueId());
+        new CatalogServiceTestCatalog(false, 16, 0, config, new TUniqueId());
     try {
       cs.reset();
     } catch (CatalogException e) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/tests/experiments/test_catalog_hms_failures.py
----------------------------------------------------------------------
diff --git a/tests/experiments/test_catalog_hms_failures.py 
b/tests/experiments/test_catalog_hms_failures.py
new file mode 100644
index 0000000..efa4fed
--- /dev/null
+++ b/tests/experiments/test_catalog_hms_failures.py
@@ -0,0 +1,188 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Test Catalog behavior when HMS is not present
+
+import os
+import pytest
+from subprocess import check_call
+import time
+
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.custom_cluster_test_suite import (
+    CustomClusterTestSuite,
+    NUM_SUBSCRIBERS)
+from tests.util.filesystem_utils import IS_ISILON, IS_LOCAL
+
+class TestCatalogHMSFailures(CustomClusterTestSuite):
+  @classmethod
+  def setup_class(cls):
+    super(TestCatalogHMSFailures, cls).setup_class()
+
+  @classmethod
+  def run_hive_server(cls):
+    script = os.path.join(os.environ['IMPALA_HOME'], 
'testdata/bin/run-hive-server.sh')
+    run_cmd = [script]
+    if IS_LOCAL or IS_ISILON:
+      run_cmd.append('-only_metastore')
+    check_call(run_cmd, close_fds=True)
+
+  @classmethod
+  def cleanup_process(cls, proc):
+    try:
+      proc.kill()
+    except:
+      pass
+    try:
+      proc.wait()
+    except:
+      pass
+
+  @classmethod
+  def teardown_class(cls):
+    # Make sure the metastore is running even if the test aborts somewhere 
unexpected
+    # before restarting the metastore itself.
+    cls.run_hive_server()
+    super(TestCatalogHMSFailures, cls).teardown_class()
+
+  @classmethod
+  def reload_metadata(cls, client):
+    client.execute('invalidate metadata')
+    client.execute('show databases')
+
+  @pytest.mark.execute_serially
+  
@CustomClusterTestSuite.with_args(catalogd_args='--initial_hms_cnxn_timeout_s=120')
+  def test_kill_hms_after_catalog_init(self, vector):
+    """IMPALA-4278: If HMS dies after catalogd initialization, SQL statements 
that force
+    metadata load should fail quickly. After HMS restart, metadata load should 
work
+    again"""
+    # Make sure that catalogd is connected to HMS
+    impalad = self.cluster.get_any_impalad()
+    client = impalad.service.create_beeswax_client()
+    self.reload_metadata(client)
+
+    # Kill Hive
+    kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 
'testdata/bin/kill-hive-server.sh')
+    check_call([kill_cmd], close_fds=True)
+
+    # Metadata load should fail quickly
+    start = time.time()
+    try:
+      self.reload_metadata(client)
+    except ImpalaBeeswaxException as e:
+      assert "Connection refused" in str(e)
+    else:
+      assert False, "Metadata load should have failed"
+    end = time.time()
+    assert end - start < 30, "Metadata load hasn't failed quickly enough"
+
+    # Start Hive
+    self.run_hive_server()
+
+    # Metadata load should work now
+    self.reload_metadata(client)
+
+  @pytest.mark.execute_serially
+  
@CustomClusterTestSuite.with_args(catalogd_args='--initial_hms_cnxn_timeout_s=120')
+  def test_start_catalog_before_hms(self, vector):
+    """IMPALA-4278: If catalogd is started with initial_hms_cnxn_timeout_s set 
to a value
+    greater than HMS startup time, it will manage to establish connection to 
HMS even if
+    HMS is started a little later"""
+    # Make sure that catalogd is connected to HMS
+    impalad = self.cluster.get_any_impalad()
+    client = impalad.service.create_beeswax_client()
+    self.reload_metadata(client)
+
+    # Kill Hive
+    kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 
'testdata/bin/kill-hive-server.sh')
+    check_call([kill_cmd], close_fds=True)
+
+    # Kill the catalogd.
+    catalogd = self.cluster.catalogd
+    catalogd.kill()
+
+    # The statestore should detect the catalog service has gone down.
+    statestored = self.cluster.statestored
+    statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS - 1, 
timeout=60)
+
+    try:
+      # Start the catalog service asynchronously.
+      catalogd.start()
+      # Wait 10s to be sure that the catalogd is in the 'trying to connect' 
phase of its
+      # startup.
+      time.sleep(10)
+
+      # Start Hive and wait for catalogd to come up
+      self.run_hive_server()
+      statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS, 
timeout=60)
+      impalad.service.wait_for_metric_value('catalog.ready', 1, timeout=60)
+
+      # Metadata load should work now
+      self.reload_metadata(client)
+    finally:
+      # Make sure to clean up the catalogd process that we started
+      self.cleanup_process(catalogd)
+
+  @pytest.mark.execute_serially
+  
@CustomClusterTestSuite.with_args(catalogd_args='--initial_hms_cnxn_timeout_s=30')
+  def test_catalogd_fails_if_hms_started_late(self, vector):
+    """IMPALA-4278: If the HMS is not started within 
initial_hms_cnxn_timeout_s, then the
+    catalogd fails"""
+    # Make sure that catalogd is connected to HMS
+    impalad = self.cluster.get_any_impalad()
+    client = impalad.service.create_beeswax_client()
+    self.reload_metadata(client)
+
+    # Kill Hive
+    kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 
'testdata/bin/kill-hive-server.sh')
+    check_call([kill_cmd], close_fds=True)
+
+    # Kill the catalogd.
+    catalogd = self.cluster.catalogd
+    catalogd.kill()
+
+    # The statestore should detect the catalog service has gone down.
+    statestored = self.cluster.statestored
+    statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS - 1, 
timeout=60)
+
+    try:
+      # Start the catalog service asynchronously.
+      catalogd.start()
+      # Wait 40s to be sure that the catalogd has been trying to connect to 
HMS longer
+      # than initial_hms_cnxn_timeout_s.
+      time.sleep(40)
+
+      # Start Hive
+      self.run_hive_server()
+
+      # catalogd has terminated by now
+      assert catalogd.get_pid() == None, "catalogd should have terminated"
+    finally:
+      # Make sure to clean up the catalogd process that we started
+      self.cleanup_process(catalogd)
+
+    try:
+      # Start the catalog service again and wait for it to come up.
+      catalogd.start()
+      statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS, 
timeout=60)
+      impalad.service.wait_for_metric_value('catalog.ready', 1, timeout=60)
+
+      # Metadata load should work now
+      self.reload_metadata(client)
+    finally:
+      # Make sure to clean up the catalogd process that we started
+      self.cleanup_process(catalogd)

Reply via email to