IMPALA-4278: Don't abort Catalog startup quickly if HMS is not present This change introduces a new catalogd startup option (init_first_metastore_client_timeout_seconds) that specifies the time in seconds catalogd should spend on retrying to establish a connection to HMS the first time on startup before giving up and exiting fatally.
Setting this startup option to a value that is greater than the HMS startup time will allow CM to start Impala at the same time or even before HMS. The default value of init_first_metastore_client_timeout_seconds is 120 seconds. Change-Id: I546d8fe9836004832ae40110c9fe22b3e704e11b Reviewed-on: http://gerrit.cloudera.org:8080/5095 Reviewed-by: Henry Robinson <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/60414f06 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/60414f06 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/60414f06 Branch: refs/heads/master Commit: 60414f06330aa62d06a67821309c8ff452de5a5f Parents: 76719de Author: Attila Jeges <[email protected]> Authored: Tue Oct 25 21:59:35 2016 +0200 Committer: Internal Jenkins <[email protected]> Committed: Fri Nov 18 03:12:12 2016 +0000 ---------------------------------------------------------------------- be/src/catalog/catalog.cc | 3 + be/src/util/backend-gflag-util.cc | 2 + common/thrift/BackendGflags.thrift | 2 + .../java/org/apache/impala/catalog/Catalog.java | 36 ++-- .../impala/catalog/CatalogServiceCatalog.java | 14 +- .../apache/impala/catalog/ImpaladCatalog.java | 2 +- .../impala/catalog/MetaStoreClientPool.java | 65 +++++-- .../org/apache/impala/service/JniCatalog.java | 13 +- .../testutil/CatalogServiceTestCatalog.java | 9 +- tests/experiments/test_catalog_hms_failures.py | 188 +++++++++++++++++++ 10 files changed, 285 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/be/src/catalog/catalog.cc ---------------------------------------------------------------------- diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc index b3690f6..3ed8d7f 100644 --- a/be/src/catalog/catalog.cc +++ b/be/src/catalog/catalog.cc @@ -35,6 +35,9 @@ DEFINE_bool(load_catalog_in_background, false, DEFINE_int32(num_metadata_loading_threads, 16, "(Advanced) The number of metadata loading threads (degree of parallelism) to use " "when loading catalog metadata."); +DEFINE_int32(initial_hms_cnxn_timeout_s, 120, + "Number of seconds catalogd will wait to establish an initial connection to the HMS " + "before exiting."); DEFINE_string(sentry_config, "", "Local path to a sentry-site.xml configuration " "file. If set, authorization will be enabled."); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/be/src/util/backend-gflag-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index f504e57..da62b61 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -29,6 +29,7 @@ DECLARE_bool(load_auth_to_local_rules); DECLARE_int32(non_impala_java_vlog); DECLARE_int32(read_size); DECLARE_int32(num_metadata_loading_threads); +DECLARE_int32(initial_hms_cnxn_timeout_s); DECLARE_int32(kudu_operation_timeout_ms); DECLARE_int64(inc_stats_size_limit_bytes); DECLARE_string(principal); @@ -57,6 +58,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) { cfg.__set_kudu_master_hosts(FLAGS_kudu_master_hosts); cfg.__set_read_size(FLAGS_read_size); cfg.__set_num_metadata_loading_threads(FLAGS_num_metadata_loading_threads); + cfg.__set_initial_hms_cnxn_timeout_s(FLAGS_initial_hms_cnxn_timeout_s); cfg.__set_sentry_config(FLAGS_sentry_config); // auth_to_local rules are read if --load_auth_to_local_rules is set to true // and impala is kerberized. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/common/thrift/BackendGflags.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift index ad081a6..09cf6f7 100644 --- a/common/thrift/BackendGflags.thrift +++ b/common/thrift/BackendGflags.thrift @@ -52,4 +52,6 @@ struct TBackendGflags { 15: required i32 read_size 16: required i32 kudu_operation_timeout_ms + + 17: required i32 initial_hms_cnxn_timeout_s } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/fe/src/main/java/org/apache/impala/catalog/Catalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java index 733b2f2..403b9c1 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java @@ -17,14 +17,16 @@ package org.apache.impala.catalog; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; -import org.apache.log4j.Logger; - import org.apache.impala.analysis.FunctionName; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.thrift.TCatalogObject; @@ -32,9 +34,7 @@ import org.apache.impala.thrift.TFunction; import org.apache.impala.thrift.TPartitionKeyValue; import org.apache.impala.thrift.TTableName; import org.apache.impala.util.PatternMatcher; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import org.apache.log4j.Logger; /** * Thread safe interface for reading and updating metadata stored in the Hive MetaStore. @@ -60,11 +60,10 @@ public abstract class Catalog { // Initial catalog version. public final static long INITIAL_CATALOG_VERSION = 0L; public static final String DEFAULT_DB = "default"; - private static final int META_STORE_CLIENT_POOL_SIZE = 10; - public static final String BUILTINS_DB = "_impala_builtins"; - protected final MetaStoreClientPool metaStoreClientPool_ = new MetaStoreClientPool(0); + protected final MetaStoreClientPool metaStoreClientPool_ = + new MetaStoreClientPool(0, 0); // Cache of authorization policy metadata. Populated from data retried from the // Sentry Service, if configured. @@ -88,19 +87,24 @@ public abstract class Catalog { protected final CatalogObjectCache<HdfsCachePool> hdfsCachePools_ = new CatalogObjectCache<HdfsCachePool>(false); - /** - * Creates a new instance of a Catalog. If initMetastoreClientPool is true, will - * also add META_STORE_CLIENT_POOL_SIZE clients to metastoreClientPool_. - */ - public Catalog(boolean initMetastoreClientPool) { - if (initMetastoreClientPool) { - metaStoreClientPool_.addClients(META_STORE_CLIENT_POOL_SIZE); - } + public Catalog() { dataSources_ = new CatalogObjectCache<DataSource>(); builtinsDb_ = new BuiltinsDb(BUILTINS_DB, this); addDb(builtinsDb_); } + /** + * Creates a new instance of Catalog. It also adds 'numClients' clients to + * 'metastoreClientPool_'. + * 'initialCnxnTimeoutSec' specifies the time (in seconds) Catalog will wait to + * establish an initial connection to the HMS. Using this setting allows catalogd and + * HMS to be started simultaneously. + */ + public Catalog(int numClients, int initialCnxnTimeoutSec) { + this(); + metaStoreClientPool_.initClients(numClients, initialCnxnTimeoutSec); + } + public Db getBuiltinsDb() { return builtinsDb_; } /** http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 203ef6d..7997412 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -107,6 +107,7 @@ import com.google.common.collect.Sets; public class CatalogServiceCatalog extends Catalog { private static final Logger LOG = Logger.getLogger(CatalogServiceCatalog.class); + private static final int INITIAL_META_STORE_CLIENT_POOL_SIZE = 10; private final TUniqueId catalogServiceId_; // Fair lock used to synchronize reads/writes of catalogVersion_. Because this lock @@ -148,13 +149,16 @@ public class CatalogServiceCatalog extends Catalog { private static String localLibraryPath_; /** - * Initialize the CatalogServiceCatalog. If loadInBackground is true, table metadata - * will be loaded in the background + * Initialize the CatalogServiceCatalog. If 'loadInBackground' is true, table metadata + * will be loaded in the background. 'initialHmsCnxnTimeoutSec' specifies the time (in + * seconds) CatalogServiceCatalog will wait to establish an initial connection to the + * HMS before giving up. Using this setting allows catalogd and HMS to be started + * simultaneously. */ public CatalogServiceCatalog(boolean loadInBackground, int numLoadingThreads, - SentryConfig sentryConfig, TUniqueId catalogServiceId, String kerberosPrincipal, - String localLibraryPath) { - super(true); + int initialHmsCnxnTimeoutSec, SentryConfig sentryConfig, TUniqueId catalogServiceId, + String kerberosPrincipal, String localLibraryPath) { + super(INITIAL_META_STORE_CLIENT_POOL_SIZE, initialHmsCnxnTimeoutSec); catalogServiceId_ = catalogServiceId; tableLoadingMgr_ = new TableLoadingMgr(this, numLoadingThreads); loadInBackground_ = loadInBackground; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java index 3647256..a59f997 100644 --- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java @@ -96,7 +96,7 @@ public class ImpaladCatalog extends Catalog { * CatalogServer. */ public ImpaladCatalog(String defaultKuduMasterHosts) { - super(false); + super(); defaultKuduMasterHosts_ = defaultKuduMasterHosts; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/fe/src/main/java/org/apache/impala/catalog/MetaStoreClientPool.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/MetaStoreClientPool.java b/fe/src/main/java/org/apache/impala/catalog/MetaStoreClientPool.java index 372fe37..29e5df9 100644 --- a/fe/src/main/java/org/apache/impala/catalog/MetaStoreClientPool.java +++ b/fe/src/main/java/org/apache/impala/catalog/MetaStoreClientPool.java @@ -18,6 +18,7 @@ package org.apache.impala.catalog; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaHook; @@ -72,15 +73,38 @@ public class MetaStoreClientPool { private final IMetaStoreClient hiveClient_; private boolean isInUse_; - private MetaStoreClient(HiveConf hiveConf) { - try { - LOG.debug("Creating MetaStoreClient. Pool Size = " + clientPool_.size()); - hiveClient_ = RetryingMetaStoreClient.getProxy(hiveConf, dummyHookLoader, - HiveMetaStoreClient.class.getName()); - } catch (Exception e) { - // Turn in to an unchecked exception - throw new IllegalStateException(e); + /** + * Creates a new instance of MetaStoreClient. + * 'cnxnTimeoutSec' specifies the time MetaStoreClient will wait to establish first + * connection to the HMS before giving up and failing out with an exception. + */ + private MetaStoreClient(HiveConf hiveConf, int cnxnTimeoutSec) { + LOG.debug("Creating MetaStoreClient. Pool Size = " + clientPool_.size()); + + long retryDelaySeconds = hiveConf.getTimeVar( + HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS); + long retryDelayMillis = retryDelaySeconds * 1000; + long endTimeMillis = System.currentTimeMillis() + cnxnTimeoutSec * 1000; + IMetaStoreClient hiveClient = null; + while (true) { + try { + hiveClient = RetryingMetaStoreClient.getProxy(hiveConf, dummyHookLoader, + HiveMetaStoreClient.class.getName()); + break; + } catch (Exception e) { + // If time is up, throw an unchecked exception + long delayUntilMillis = System.currentTimeMillis() + retryDelayMillis; + if (delayUntilMillis >= endTimeMillis) throw new IllegalStateException(e); + + LOG.warn("Failed to connect to Hive MetaStore. Retrying.", e); + while (delayUntilMillis > System.currentTimeMillis()) { + try { + Thread.sleep(delayUntilMillis - System.currentTimeMillis()); + } catch (InterruptedException | IllegalArgumentException ignore) {} + } + } } + hiveClient_ = hiveClient; isInUse_ = false; } @@ -119,23 +143,30 @@ public class MetaStoreClientPool { } } - public MetaStoreClientPool(int initialSize) { - this(initialSize, new HiveConf(MetaStoreClientPool.class)); + public MetaStoreClientPool(int initialSize, int initialCnxnTimeoutSec) { + this(initialSize, initialCnxnTimeoutSec, new HiveConf(MetaStoreClientPool.class)); } - public MetaStoreClientPool(int initialSize, HiveConf hiveConf) { + public MetaStoreClientPool(int initialSize, int initialCnxnTimeoutSec, + HiveConf hiveConf) { hiveConf_ = hiveConf; clientCreationDelayMs_ = hiveConf_.getInt(HIVE_METASTORE_CNXN_DELAY_MS_CONF, DEFAULT_HIVE_METASTORE_CNXN_DELAY_MS_CONF); - addClients(initialSize); + initClients(initialSize, initialCnxnTimeoutSec); } /** - * Add numClients to the client pool. + * Initialize client pool with 'numClients' client. + * 'initialCnxnTimeoutSec' specifies the time (in seconds) the first client will wait to + * establish an initial connection to the HMS. */ - public void addClients(int numClients) { - for (int i = 0; i < numClients; ++i) { - clientPool_.add(new MetaStoreClient(hiveConf_)); + public void initClients(int numClients, int initialCnxnTimeoutSec) { + Preconditions.checkState(clientPool_.size() == 0); + if (numClients > 0) { + clientPool_.add(new MetaStoreClient(hiveConf_, initialCnxnTimeoutSec)); + for (int i = 0; i < numClients - 1; ++i) { + clientPool_.add(new MetaStoreClient(hiveConf_, 0)); + } } } @@ -163,7 +194,7 @@ public class MetaStoreClientPool { } catch (InterruptedException e) { /* ignore */ } - client = new MetaStoreClient(hiveConf_); + client = new MetaStoreClient(hiveConf_, 0); } } client.markInUse(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/fe/src/main/java/org/apache/impala/service/JniCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java index db9ba63..b756230 100644 --- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java +++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java @@ -17,6 +17,10 @@ package org.apache.impala.service; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -56,10 +60,6 @@ import org.apache.thrift.protocol.TBinaryProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; - /** * JNI-callable interface for the CatalogService. The main point is to serialize * and de-serialize thrift structures between C and Java parts of the CatalogService. @@ -87,6 +87,7 @@ public class JniCatalog { BackendConfig.create(cfg); Preconditions.checkArgument(cfg.num_metadata_loading_threads > 0); + Preconditions.checkArgument(cfg.initial_hms_cnxn_timeout_s > 0); // This trick saves having to pass a TLogLevel enum, which is an object and more // complex to pass through JNI. GlogAppender.Install(TLogLevel.values()[cfg.impala_log_lvl], @@ -101,8 +102,8 @@ public class JniCatalog { LOG.info(JniUtil.getJavaVersion()); catalog_ = new CatalogServiceCatalog(cfg.load_catalog_in_background, - cfg.num_metadata_loading_threads, sentryConfig, getServiceId(), cfg.principal, - cfg.local_library_path); + cfg.num_metadata_loading_threads, cfg.initial_hms_cnxn_timeout_s, sentryConfig, + getServiceId(), cfg.principal, cfg.local_library_path); try { catalog_.reset(); } catch (CatalogException e) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java index c364ba5..93e4af0 100644 --- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java +++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java @@ -30,9 +30,10 @@ import org.apache.impala.thrift.TUniqueId; public class CatalogServiceTestCatalog extends CatalogServiceCatalog { public CatalogServiceTestCatalog(boolean loadInBackground, int numLoadingThreads, - SentryConfig sentryConfig, TUniqueId catalogServiceId) { - super(loadInBackground, numLoadingThreads, sentryConfig, catalogServiceId, null, - System.getProperty("java.io.tmpdir")); + int initialHmsCnxnTimeoutSec, SentryConfig sentryConfig, + TUniqueId catalogServiceId) { + super(loadInBackground, numLoadingThreads, initialHmsCnxnTimeoutSec, sentryConfig, + catalogServiceId, null, System.getProperty("java.io.tmpdir")); // Cache pools are typically loaded asynchronously, but as there is no fixed execution // order for tests, the cache pools are loaded synchronously before the tests are @@ -51,7 +52,7 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog { */ public static CatalogServiceCatalog createWithAuth(SentryConfig config) { CatalogServiceCatalog cs = - new CatalogServiceTestCatalog(false, 16, config, new TUniqueId()); + new CatalogServiceTestCatalog(false, 16, 0, config, new TUniqueId()); try { cs.reset(); } catch (CatalogException e) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/60414f06/tests/experiments/test_catalog_hms_failures.py ---------------------------------------------------------------------- diff --git a/tests/experiments/test_catalog_hms_failures.py b/tests/experiments/test_catalog_hms_failures.py new file mode 100644 index 0000000..efa4fed --- /dev/null +++ b/tests/experiments/test_catalog_hms_failures.py @@ -0,0 +1,188 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# Test Catalog behavior when HMS is not present + +import os +import pytest +from subprocess import check_call +import time + +from tests.beeswax.impala_beeswax import ImpalaBeeswaxException +from tests.common.custom_cluster_test_suite import ( + CustomClusterTestSuite, + NUM_SUBSCRIBERS) +from tests.util.filesystem_utils import IS_ISILON, IS_LOCAL + +class TestCatalogHMSFailures(CustomClusterTestSuite): + @classmethod + def setup_class(cls): + super(TestCatalogHMSFailures, cls).setup_class() + + @classmethod + def run_hive_server(cls): + script = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/run-hive-server.sh') + run_cmd = [script] + if IS_LOCAL or IS_ISILON: + run_cmd.append('-only_metastore') + check_call(run_cmd, close_fds=True) + + @classmethod + def cleanup_process(cls, proc): + try: + proc.kill() + except: + pass + try: + proc.wait() + except: + pass + + @classmethod + def teardown_class(cls): + # Make sure the metastore is running even if the test aborts somewhere unexpected + # before restarting the metastore itself. + cls.run_hive_server() + super(TestCatalogHMSFailures, cls).teardown_class() + + @classmethod + def reload_metadata(cls, client): + client.execute('invalidate metadata') + client.execute('show databases') + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(catalogd_args='--initial_hms_cnxn_timeout_s=120') + def test_kill_hms_after_catalog_init(self, vector): + """IMPALA-4278: If HMS dies after catalogd initialization, SQL statements that force + metadata load should fail quickly. After HMS restart, metadata load should work + again""" + # Make sure that catalogd is connected to HMS + impalad = self.cluster.get_any_impalad() + client = impalad.service.create_beeswax_client() + self.reload_metadata(client) + + # Kill Hive + kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/kill-hive-server.sh') + check_call([kill_cmd], close_fds=True) + + # Metadata load should fail quickly + start = time.time() + try: + self.reload_metadata(client) + except ImpalaBeeswaxException as e: + assert "Connection refused" in str(e) + else: + assert False, "Metadata load should have failed" + end = time.time() + assert end - start < 30, "Metadata load hasn't failed quickly enough" + + # Start Hive + self.run_hive_server() + + # Metadata load should work now + self.reload_metadata(client) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(catalogd_args='--initial_hms_cnxn_timeout_s=120') + def test_start_catalog_before_hms(self, vector): + """IMPALA-4278: If catalogd is started with initial_hms_cnxn_timeout_s set to a value + greater than HMS startup time, it will manage to establish connection to HMS even if + HMS is started a little later""" + # Make sure that catalogd is connected to HMS + impalad = self.cluster.get_any_impalad() + client = impalad.service.create_beeswax_client() + self.reload_metadata(client) + + # Kill Hive + kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/kill-hive-server.sh') + check_call([kill_cmd], close_fds=True) + + # Kill the catalogd. + catalogd = self.cluster.catalogd + catalogd.kill() + + # The statestore should detect the catalog service has gone down. + statestored = self.cluster.statestored + statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS - 1, timeout=60) + + try: + # Start the catalog service asynchronously. + catalogd.start() + # Wait 10s to be sure that the catalogd is in the 'trying to connect' phase of its + # startup. + time.sleep(10) + + # Start Hive and wait for catalogd to come up + self.run_hive_server() + statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS, timeout=60) + impalad.service.wait_for_metric_value('catalog.ready', 1, timeout=60) + + # Metadata load should work now + self.reload_metadata(client) + finally: + # Make sure to clean up the catalogd process that we started + self.cleanup_process(catalogd) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(catalogd_args='--initial_hms_cnxn_timeout_s=30') + def test_catalogd_fails_if_hms_started_late(self, vector): + """IMPALA-4278: If the HMS is not started within initial_hms_cnxn_timeout_s, then the + catalogd fails""" + # Make sure that catalogd is connected to HMS + impalad = self.cluster.get_any_impalad() + client = impalad.service.create_beeswax_client() + self.reload_metadata(client) + + # Kill Hive + kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/kill-hive-server.sh') + check_call([kill_cmd], close_fds=True) + + # Kill the catalogd. + catalogd = self.cluster.catalogd + catalogd.kill() + + # The statestore should detect the catalog service has gone down. + statestored = self.cluster.statestored + statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS - 1, timeout=60) + + try: + # Start the catalog service asynchronously. + catalogd.start() + # Wait 40s to be sure that the catalogd has been trying to connect to HMS longer + # than initial_hms_cnxn_timeout_s. + time.sleep(40) + + # Start Hive + self.run_hive_server() + + # catalogd has terminated by now + assert catalogd.get_pid() == None, "catalogd should have terminated" + finally: + # Make sure to clean up the catalogd process that we started + self.cleanup_process(catalogd) + + try: + # Start the catalog service again and wait for it to come up. + catalogd.start() + statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS, timeout=60) + impalad.service.wait_for_metric_value('catalog.ready', 1, timeout=60) + + # Metadata load should work now + self.reload_metadata(client) + finally: + # Make sure to clean up the catalogd process that we started + self.cleanup_process(catalogd)
