IMPALA-4704: Turns on client connections when local catalog initialized.

Currently, impalad starts beeswax and hs2 servers even if the
catalog has not yet been initialized. As a result, client
connections see an error message stating that the impalad
is not yet ready.

This patch changes the impalad startup sequence to wait
until the catalog is received before opening beeswax and hs2 ports
and starting their servers.

Testing:
- python e2e tests that start a cluster without a catalog
  and check that client connections are rejected as expected.

Change-Id: I52b881cba18a7e4533e21a78751c2e35c3d4c8a6
Reviewed-on: http://gerrit.cloudera.org:8080/8202
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6a2b7a64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6a2b7a64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6a2b7a64

Branch: refs/heads/master
Commit: 6a2b7a64fb1fd710fe2e3c2a106a3bf589fefe76
Parents: 11bbc26
Author: Vuk Ercegovac <[email protected]>
Authored: Fri Sep 29 18:51:02 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Mon Nov 13 21:14:14 2017 +0000

----------------------------------------------------------------------
 be/src/benchmarks/expr-benchmark.cc             |   2 +-
 be/src/common/global-flags.cc                   |   3 +
 be/src/runtime/exec-env.cc                      |  14 +-
 be/src/runtime/exec-env.h                       |   8 +-
 be/src/service/frontend.cc                      |  26 +++-
 be/src/service/frontend.h                       |  16 ++-
 be/src/service/impala-server.cc                 | 144 ++++++++++---------
 be/src/service/impala-server.h                  |  41 ++++--
 be/src/service/impalad-main.cc                  |  12 +-
 be/src/testutil/in-process-servers.cc           |  16 +--
 be/src/testutil/in-process-servers.h            |   2 +-
 bin/start-impala-cluster.py                     |  70 ++++++---
 .../org/apache/impala/service/Frontend.java     |  28 +++-
 .../org/apache/impala/service/JniFrontend.java  |   4 +-
 .../org/apache/impala/service/FrontendTest.java |  64 ---------
 tests/common/custom_cluster_test_suite.py       |  23 ++-
 tests/common/impala_cluster.py                  |   6 +-
 tests/common/impala_service.py                  |   2 +-
 tests/custom_cluster/test_catalog_wait.py       |  79 ++++++++++
 tests/custom_cluster/test_coordinators.py       |   2 +
 20 files changed, 347 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/benchmarks/expr-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/expr-benchmark.cc 
b/be/src/benchmarks/expr-benchmark.cc
index f1208f8..b10a70f 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -65,7 +65,7 @@ using namespace impala;
 class Planner {
  public:
   Planner() {
-    ABORT_IF_ERROR(frontend_.SetCatalogInitialized());
+    frontend_.SetCatalogIsReady();
     ABORT_IF_ERROR(exec_env_.InitForFeTests());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 29c16a5..c2b8e42 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -140,6 +140,9 @@ DEFINE_int32(stress_scratch_write_delay_ms, 0, "A stress 
option which causes wri
 DEFINE_bool(thread_creation_fault_injection, false, "A fault injection option 
that "
     " causes calls to Thread::Create() to fail randomly 1% of the time on 
eligible "
     " codepaths. Effective in debug builds only.");
+DEFINE_int32(stress_catalog_init_delay_ms, 0, "A stress option that injects 
extra delay"
+    " in milliseconds when initializing an impalad's local catalog replica. 
Delay <= 0"
+    " inject no delay.");
 #endif
 
 // Used for testing the path where the Kudu client is stubbed.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 0ceb636..999b56a 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -382,8 +382,8 @@ Status ExecEnv::Init() {
   return Status::OK();
 }
 
-Status ExecEnv::StartServices() {
-  LOG(INFO) << "Starting global services";
+Status ExecEnv::StartStatestoreSubscriberService() {
+  LOG(INFO) << "Starting statestore subscriber service";
 
   // Must happen after all topic registrations / callbacks are done
   if (statestore_subscriber_.get() != nullptr) {
@@ -394,8 +394,14 @@ Status ExecEnv::StartServices() {
     }
   }
 
-  // Start this last so everything is in place before accepting the first call.
-  if (FLAGS_use_krpc) RETURN_IF_ERROR(rpc_mgr_->StartServices(krpc_address_));
+  return Status::OK();
+}
+
+Status ExecEnv::StartKrpcService() {
+  if (FLAGS_use_krpc) {
+    LOG(INFO) << "Starting KRPC service";
+    RETURN_IF_ERROR(rpc_mgr_->StartServices(krpc_address_));
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index df0d926..8fafdc5 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -91,8 +91,12 @@ class ExecEnv {
   /// subsystems like the webserver, scheduler etc.
   Status Init();
 
-  /// Starts any dependent services in their correct order
-  Status StartServices() WARN_UNUSED_RESULT;
+  /// Starts the service to subscribe to the statestore.
+  Status StartStatestoreSubscriberService() WARN_UNUSED_RESULT;
+
+  /// Starts krpc, if needed. Start this last so everything is in place before 
accepting
+  /// the first call.
+  Status StartKrpcService() WARN_UNUSED_RESULT;
 
   /// TODO: Should ExecEnv own the ImpalaServer as well?
   void SetImpalaServer(ImpalaServer* server) { impala_server_ = server; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/service/frontend.cc
----------------------------------------------------------------------
diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc
index e48cb1e..df39114 100644
--- a/be/src/service/frontend.cc
+++ b/be/src/service/frontend.cc
@@ -24,9 +24,14 @@
 #include "rpc/jni-thrift-util.h"
 #include "util/backend-gflag-util.h"
 #include "util/jni-util.h"
+#include "util/time.h"
 
 #include "common/names.h"
 
+#ifndef NDEBUG
+DECLARE_int32(stress_catalog_init_delay_ms);
+#endif
+
 using namespace impala;
 
 // Authorization related flags. Must be set to valid values to properly 
configure
@@ -77,7 +82,8 @@ Frontend::Frontend() {
     {"getRoles", "([B)[B", &show_roles_id_},
     {"getRolePrivileges", "([B)[B", &get_role_privileges_id_},
     {"execHiveServer2MetadataOp", "([B)[B", &exec_hs2_metadata_op_id_},
-    {"setCatalogInitialized", "()V", &set_catalog_initialized_id_},
+    {"setCatalogIsReady", "()V", &set_catalog_is_ready_id_},
+    {"waitForCatalog", "()V", &wait_for_catalog_id_},
     {"loadTableData", "([B)[B", &load_table_data_id_},
     {"getTableFiles", "([B)[B", &get_table_files_id_},
     {"showCreateFunction", "([B)Ljava/lang/String;", 
&show_create_function_id_},
@@ -238,13 +244,19 @@ bool Frontend::IsAuthorizationError(const Status& status) 
{
   return !status.ok() && status.GetDetail().find("AuthorizationException") == 
0;
 }
 
-Status Frontend::SetCatalogInitialized() {
+void Frontend::SetCatalogIsReady() {
   JNIEnv* jni_env = getJNIEnv();
-  JniLocalFrame jni_frame;
-  RETURN_IF_ERROR(jni_frame.push(jni_env));
-  jni_env->CallObjectMethod(fe_, set_catalog_initialized_id_);
-  RETURN_ERROR_IF_EXC(jni_env);
-  return Status::OK();
+  jni_env->CallVoidMethod(fe_, set_catalog_is_ready_id_);
+}
+
+void Frontend::WaitForCatalog() {
+#ifndef NDEBUG
+  if (FLAGS_stress_catalog_init_delay_ms > 0) {
+    SleepForMs(FLAGS_stress_catalog_init_delay_ms);
+  }
+#endif
+  JNIEnv* jni_env = getJNIEnv();
+  jni_env->CallVoidMethod(fe_, wait_for_catalog_id_);
 }
 
 Status Frontend::GetTableFiles(const TShowFilesParams& params, TResultSet* 
result) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/service/frontend.h
----------------------------------------------------------------------
diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h
index c5c4895..4881220 100644
--- a/be/src/service/frontend.h
+++ b/be/src/service/frontend.h
@@ -159,11 +159,14 @@ class Frontend {
   /// Returns true if the error returned by the FE was due to an 
AuthorizationException.
   static bool IsAuthorizationError(const Status& status);
 
-  /// Sets the FE catalog to be initialized. This is only used for testing in
-  /// conjunction with InProcessImpalaServer. This sets the FE catalog to
-  /// be initialized, ready to receive queries without needing a catalog
-  /// server.
-  Status SetCatalogInitialized();
+  /// Sets the frontend's catalog in the ready state. This is only used for 
testing in
+  /// conjunction with InProcessImpalaServer. This sets the frontend's catalog 
as
+  /// ready, so can receive queries without needing a catalog server.
+  void SetCatalogIsReady();
+
+  /// Waits for the FE catalog to be initialized and ready to receive queries.
+  /// There is no bound on the wait time.
+  void WaitForCatalog();
 
   /// Call FE to get files info for a table or partition.
   Status GetTableFiles(const TShowFilesParams& params, TResultSet* result);
@@ -197,7 +200,8 @@ class Frontend {
   jmethodID get_role_privileges_id_; // JniFrontend.getRolePrivileges
   jmethodID exec_hs2_metadata_op_id_; // JniFrontend.execHiveServer2MetadataOp
   jmethodID load_table_data_id_; // JniFrontend.loadTableData
-  jmethodID set_catalog_initialized_id_; // JniFrontend.setCatalogInitialized
+  jmethodID set_catalog_is_ready_id_; // JniFrontend.setCatalogIsReady
+  jmethodID wait_for_catalog_id_; // JniFrontend.waitForCatalog
   jmethodID get_table_files_id_; // JniFrontend.getTableFiles
   jmethodID show_create_function_id_; // JniFrontend.showCreateFunction
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 4c540b7..0aecece 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -274,7 +274,8 @@ class CancellationWork {
 
 ImpalaServer::ImpalaServer(ExecEnv* exec_env)
     : exec_env_(exec_env),
-      thrift_serializer_(false) {
+      thrift_serializer_(false),
+      services_started_(false) {
   // Initialize default config
   InitializeConfigVariables();
 
@@ -1468,7 +1469,7 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
       
update_req.__set_removed_objects(catalog_update_result.removed_catalog_objects);
     }
 
-     // Apply the changes to the local catalog cache.
+    // Apply the changes to the local catalog cache.
     TUpdateCatalogCacheResponse resp;
     Status status = exec_env_->frontend()->UpdateCatalogCache(
         vector<TUpdateCatalogCacheRequest>{update_req}, &resp);
@@ -1540,7 +1541,8 @@ void ImpalaServer::MembershipCallback(
     }
 
     // Register the local backend in the statestore and update the list of 
known backends.
-    AddLocalBackendToStatestore(subscriber_topic_updates);
+    // Only register if all ports have been opened and are ready.
+    if (services_started_.load()) 
AddLocalBackendToStatestore(subscriber_topic_updates);
 
     // Create a set of known backend network addresses. Used to test for 
cluster
     // membership by network address.
@@ -1924,21 +1926,28 @@ void ImpalaServer::RegisterSessionTimeout(int32_t 
session_timeout) {
   }
 }
 
-Status ImpalaServer::Init(int32_t thrift_be_port, int32_t beeswax_port, 
int32_t hs2_port) {
+Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
+   int32_t hs2_port) {
   exec_env_->SetImpalaServer(this);
-  boost::shared_ptr<ImpalaServer> handler = shared_from_this();
 
   if (!FLAGS_is_coordinator && !FLAGS_is_executor) {
     return Status("Impala does not have a valid role configured. "
         "Either --is_coordinator or --is_executor must be set to true.");
   }
 
+  // Subscribe with the statestore. Coordinators need to subscribe to the 
catalog topic
+  // then wait for the initial catalog update.
+  RETURN_IF_ERROR(exec_env_->StartStatestoreSubscriberService());
+
+  if (FLAGS_is_coordinator) exec_env_->frontend()->WaitForCatalog();
+
   SSLProtocol ssl_version = SSLProtocol::TLSv1_0;
   if (!FLAGS_ssl_server_certificate.empty() || EnableInternalSslConnections()) 
{
     RETURN_IF_ERROR(
         SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, 
&ssl_version));
   }
 
+  // Start the internal service.
   if (thrift_be_port > 0) {
     boost::shared_ptr<ImpalaInternalService> thrift_if(new 
ImpalaInternalService());
     boost::shared_ptr<TProcessor> be_processor(
@@ -1962,88 +1971,85 @@ Status ImpalaServer::Init(int32_t thrift_be_port, 
int32_t beeswax_port, int32_t
   }
 
   if (!FLAGS_is_coordinator) {
-    // We don't start the Beeswax and HS2 servers if this impala daemon is 
just an
-    // executor.
-    LOG(INFO) << "Started executor Impala server on "
+    LOG(INFO) << "Initialized executor Impala server on "
               << ExecEnv::GetInstance()->backend_address();
-    return Status::OK();
-  }
-
-  // Start the Beeswax and HS2 servers.
-  if (beeswax_port > 0) {
-    boost::shared_ptr<TProcessor> beeswax_processor(new 
ImpalaServiceProcessor(handler));
-    boost::shared_ptr<TProcessorEventHandler> event_handler(
-        new RpcEventHandler("beeswax", exec_env_->metrics()));
-    beeswax_processor->setEventHandler(event_handler);
-    ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, 
beeswax_port);
+  } else {
+    // Initialize the client servers.
+    boost::shared_ptr<ImpalaServer> handler = shared_from_this();
+    if (beeswax_port > 0) {
+      boost::shared_ptr<TProcessor> beeswax_processor(
+          new ImpalaServiceProcessor(handler));
+      boost::shared_ptr<TProcessorEventHandler> event_handler(
+          new RpcEventHandler("beeswax", exec_env_->metrics()));
+      beeswax_processor->setEventHandler(event_handler);
+      ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, 
beeswax_port);
+
+      if (!FLAGS_ssl_server_certificate.empty()) {
+        LOG(INFO) << "Enabling SSL for Beeswax";
+        builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
+              .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
+              .ssl_version(ssl_version)
+              .cipher_list(FLAGS_ssl_cipher_list);
+      }
 
-    if (!FLAGS_ssl_server_certificate.empty()) {
-      LOG(INFO) << "Enabling SSL for Beeswax";
-      builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
-          .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
-          .ssl_version(ssl_version)
-          .cipher_list(FLAGS_ssl_cipher_list);
+      ThriftServer* server;
+      RETURN_IF_ERROR(
+          
builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
+          .metrics(exec_env_->metrics())
+          .max_concurrent_connections(FLAGS_fe_service_threads)
+          .Build(&server));
+      beeswax_server_.reset(server);
+      beeswax_server_->SetConnectionHandler(this);
     }
 
-    ThriftServer* server;
-    RETURN_IF_ERROR(
-        
builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
-            .metrics(exec_env_->metrics())
-            .max_concurrent_connections(FLAGS_fe_service_threads)
-            .Build(&server));
-    beeswax_server_.reset(server);
-    beeswax_server_->SetConnectionHandler(this);
-  }
-
-  if (hs2_port > 0) {
-    boost::shared_ptr<TProcessor> hs2_fe_processor(
-        new ImpalaHiveServer2ServiceProcessor(handler));
-    boost::shared_ptr<TProcessorEventHandler> event_handler(
-        new RpcEventHandler("hs2", exec_env_->metrics()));
-    hs2_fe_processor->setEventHandler(event_handler);
-
-    ThriftServerBuilder builder(HS2_SERVER_NAME, hs2_fe_processor, hs2_port);
+    if (hs2_port > 0) {
+      boost::shared_ptr<TProcessor> hs2_fe_processor(
+          new ImpalaHiveServer2ServiceProcessor(handler));
+      boost::shared_ptr<TProcessorEventHandler> event_handler(
+          new RpcEventHandler("hs2", exec_env_->metrics()));
+      hs2_fe_processor->setEventHandler(event_handler);
+
+      ThriftServerBuilder builder(HS2_SERVER_NAME, hs2_fe_processor, hs2_port);
+
+      if (!FLAGS_ssl_server_certificate.empty()) {
+        LOG(INFO) << "Enabling SSL for HiveServer2";
+        builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
+              .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
+              .ssl_version(ssl_version)
+              .cipher_list(FLAGS_ssl_cipher_list);
+      }
 
-    if (!FLAGS_ssl_server_certificate.empty()) {
-      LOG(INFO) << "Enabling SSL for HiveServer2";
-      builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
-          .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
-          .ssl_version(ssl_version)
-          .cipher_list(FLAGS_ssl_cipher_list);
+      ThriftServer* server;
+      RETURN_IF_ERROR(
+          
builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
+          .metrics(exec_env_->metrics())
+          .max_concurrent_connections(FLAGS_fe_service_threads)
+          .Build(&server));
+      hs2_server_.reset(server);
+      hs2_server_->SetConnectionHandler(this);
     }
-
-    ThriftServer* server;
-    RETURN_IF_ERROR(
-        
builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
-            .metrics(exec_env_->metrics())
-            .max_concurrent_connections(FLAGS_fe_service_threads)
-            .Build(&server));
-    hs2_server_.reset(server);
-    hs2_server_->SetConnectionHandler(this);
-
   }
+  LOG(INFO) << "Initialized coordinator/executor Impala server on "
+      << ExecEnv::GetInstance()->backend_address();
 
-  LOG(INFO) << "Started coordinator/executor Impala server on "
-            << ExecEnv::GetInstance()->backend_address();
-
-  return Status::OK();
-}
-
-Status ImpalaServer::Start() {
-  RETURN_IF_ERROR(exec_env_->StartServices());
+  // Start the RPC services.
+  RETURN_IF_ERROR(exec_env_->StartKrpcService());
   if (thrift_be_server_.get()) {
     RETURN_IF_ERROR(thrift_be_server_->Start());
     LOG(INFO) << "Impala InternalService listening on " << 
thrift_be_server_->port();
   }
-
   if (hs2_server_.get()) {
     RETURN_IF_ERROR(hs2_server_->Start());
-    LOG(INFO) << "Impala HiveServer2 Service listening on " << 
beeswax_server_->port();
+    LOG(INFO) << "Impala HiveServer2 Service listening on " << 
hs2_server_->port();
   }
   if (beeswax_server_.get()) {
     RETURN_IF_ERROR(beeswax_server_->Start());
-    LOG(INFO) << "Impala Beeswax Service listening on " << hs2_server_->port();
+    LOG(INFO) << "Impala Beeswax Service listening on " << 
beeswax_server_->port();
   }
+  services_started_ = true;
+  ImpaladMetrics::IMPALA_SERVER_READY->set_value(true);
+  LOG(INFO) << "Impala has started.";
+
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 81ec929..c808c35 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -18,6 +18,7 @@
 #ifndef IMPALA_SERVICE_IMPALA_SERVER_H
 #define IMPALA_SERVICE_IMPALA_SERVER_H
 
+#include <atomic>
 #include <boost/thread/mutex.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/scoped_ptr.hpp>
@@ -75,6 +76,30 @@ class ClientRequestState;
 /// An ImpalaServer contains both frontend and backend functionality;
 /// it implements ImpalaService (Beeswax), ImpalaHiveServer2Service 
(HiveServer2)
 /// and ImpalaInternalService APIs.
+/// ImpalaServer can be started in 1 of 3 roles: executor, coordinator, or 
both executor
+/// and coordinator. All roles start ImpalaInternalService API's. The
+/// coordinator role additionally starts client API's (Beeswax and 
HiveServer2).
+///
+/// Startup Sequence
+/// ----------------
+/// The startup sequence opens and starts all services so that they are ready 
to be used
+/// by clients at the same time. The Impala server is considered 'ready' only 
when it can
+/// process requests with all of its specified roles. Avoiding states where 
some roles are
+/// ready and some are not makes it easier to reason about the state of the 
server.
+///
+/// Main thread (caller code), after instantiating the server, must call 
Start().
+/// Start() does the following:
+///    - Start internal services
+///    - Wait (indefinitely) for local catalog to be initialized from 
statestore
+///      (if coordinator)
+///    - Open ImpalaInternalService ports
+///    - Open client ports (if coordinator)
+///    - Start ImpalaInternalService API
+///    - Start client service API's (if coordinator)
+///    - Set services_started_ flag
+///
+/// Internally, the Membership callback thread also participates in startup:
+///    - If services_started_, then register to the statestore as an executor.
 ///
 /// Locking
 /// -------
@@ -118,14 +143,10 @@ class ImpalaServer : public ImpalaServiceIf,
   ImpalaServer(ExecEnv* exec_env);
   ~ImpalaServer();
 
-  /// Initializes RPC services and other subsystems (like audit logging). 
Returns an error
-  /// if initialization failed. If any ports are <= 0, their respective 
service will not
-  /// be started.
-  Status Init(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port);
-
-  /// Starts client and internal services. Does not block. Returns an error if 
any service
-  /// failed to start.
-  Status Start();
+  /// Initializes and starts RPC services and other subsystems (like audit 
logging).
+  /// Returns an error if starting any services failed. If the port is <= 0, 
their
+  ///respective service will not be started.
+  Status Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port);
 
   /// Blocks until the server shuts down (by calling Shutdown()).
   void Join();
@@ -1015,6 +1036,10 @@ class ImpalaServer : public ImpalaServiceIf,
   boost::scoped_ptr<ThriftServer> hs2_server_;
   boost::scoped_ptr<ThriftServer> thrift_be_server_;
 
+  /// Flag that records if backend and/or client services have been started. 
The flag is
+  /// set after all services required for the server have been started.
+  std::atomic_bool services_started_;
+
   /// Set to true when this ImpalaServer should shut down.
   Promise<bool> shutdown_promise_;
 };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/service/impalad-main.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index c9627d9..a7f6abd 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -84,21 +84,15 @@ int ImpaladMain(int argc, char** argv) {
   InitRpcEventTracing(exec_env.webserver());
 
   boost::shared_ptr<ImpalaServer> impala_server(new ImpalaServer(&exec_env));
-  ABORT_IF_ERROR(impala_server->Init(FLAGS_be_port, FLAGS_beeswax_port, 
FLAGS_hs2_port));
-
-  DCHECK(exec_env.process_mem_tracker() != nullptr)
-      << "ExecEnv::StartServices() must be called before starting RPC 
services";
-  Status status = impala_server->Start();
+  Status status =
+      impala_server->Start(FLAGS_be_port, FLAGS_beeswax_port, FLAGS_hs2_port);
   if (!status.ok()) {
     LOG(ERROR) << "Impalad services did not start correctly, exiting.  Error: "
-               << status.GetDetail();
+        << status.GetDetail();
     ShutdownLogging();
     exit(1);
   }
 
-  ImpaladMetrics::IMPALA_SERVER_READY->set_value(true);
-  LOG(INFO) << "Impala has started.";
-
   impala_server->Join();
 
   return 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/testutil/in-process-servers.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.cc 
b/be/src/testutil/in-process-servers.cc
index 64d681c..4817d7f 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -72,11 +72,9 @@ InProcessImpalaServer* 
InProcessImpalaServer::StartWithEphemeralPorts(
     // Start the daemon and check if it works, if not delete the current 
server object and
     // pick a new set of ports
     Status started = impala->StartWithClientServers(beeswax_port, hs2_port);
-    if (started.ok()) {
-      const Status status = impala->SetCatalogInitialized();
-      if (!status.ok()) LOG(WARNING) << status.GetDetail();
-      return impala;
-    }
+    if (started.ok()) return impala;
+    LOG(WARNING) << started.GetDetail();
+
     delete impala;
   }
   DCHECK(false) << "Could not find port to start Impalad.";
@@ -94,9 +92,9 @@ InProcessImpalaServer::InProcessImpalaServer(const string& 
hostname, int backend
           webserver_port, statestore_host, statestore_port)) {
 }
 
-Status InProcessImpalaServer::SetCatalogInitialized() {
+void InProcessImpalaServer::SetCatalogIsReady() {
   DCHECK(impala_server_ != NULL) << "Call Start*() first.";
-  return exec_env_->frontend()->SetCatalogInitialized();
+  exec_env_->frontend()->SetCatalogIsReady();
 }
 
 Status InProcessImpalaServer::StartWithClientServers(int beeswax_port, int 
hs2_port) {
@@ -105,8 +103,8 @@ Status InProcessImpalaServer::StartWithClientServers(int 
beeswax_port, int hs2_p
   hs2_port_ = hs2_port;
 
   impala_server_.reset(new ImpalaServer(exec_env_.get()));
-  RETURN_IF_ERROR(impala_server_->Init(backend_port_, beeswax_port, hs2_port));
-  RETURN_IF_ERROR(impala_server_->Start());
+  SetCatalogIsReady();
+  RETURN_IF_ERROR(impala_server_->Start(backend_port_, beeswax_port, 
hs2_port));
 
   // Wait for up to 1s for the backend server to start
   RETURN_IF_ERROR(WaitForServer(hostname_, backend_port_, 10, 100));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/testutil/in-process-servers.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.h 
b/be/src/testutil/in-process-servers.h
index 6842255..fb69135 100644
--- a/be/src/testutil/in-process-servers.h
+++ b/be/src/testutil/in-process-servers.h
@@ -70,7 +70,7 @@ class InProcessImpalaServer {
 
   /// Sets the catalog on this impalad to be initialized. If we don't
   /// start up a catalogd, then there is no one to initialize it otherwise.
-  Status SetCatalogInitialized();
+  void SetCatalogIsReady();
 
   uint32_t beeswax_port() const { return beeswax_port_; }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/bin/start-impala-cluster.py
----------------------------------------------------------------------
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 670e693..e7dcea4 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -25,7 +25,8 @@ import psutil
 import sys
 from getpass import getuser
 from time import sleep, time
-from optparse import OptionParser
+from optparse import OptionParser, SUPPRESS_HELP
+from testdata.common import cgroups
 
 KUDU_MASTER_HOSTS = os.getenv('KUDU_MASTER_HOSTS', '127.0.0.1')
 DEFAULT_IMPALA_MAX_LOG_FILES = os.environ.get('IMPALA_MAX_LOG_FILES', 10)
@@ -69,9 +70,6 @@ parser.add_option('--max_log_files', 
default=DEFAULT_IMPALA_MAX_LOG_FILES,
                   help='Max number of log files before rotation occurs.')
 parser.add_option("-v", "--verbose", dest="verbose", action="store_true", 
default=False,
                   help="Prints all output to stderr/stdout.")
-parser.add_option("--wait_for_cluster", dest="wait_for_cluster", 
action="store_true",
-                  default=False, help="Wait until the cluster is ready to 
accept "
-                  "queries before returning.")
 parser.add_option("--log_level", type="int", dest="log_level", default=1,
                    help="Set the impalad backend logging level")
 parser.add_option("--jvm_args", dest="jvm_args", default="",
@@ -80,6 +78,11 @@ parser.add_option("--kudu_master_hosts", 
default=KUDU_MASTER_HOSTS,
                   help="The host name or address of the Kudu master. Multiple 
masters "
                       "can be specified using a comma separated list.")
 
+# For testing: list of comma-separated delays, in milliseconds, that delay 
impalad catalog
+# replica initialization. The ith delay is applied to the ith impalad.
+parser.add_option("--catalog_init_delays", dest="catalog_init_delays", 
default="",
+                  help=SUPPRESS_HELP)
+
 options, args = parser.parse_args()
 
 IMPALA_HOME = os.environ['IMPALA_HOME']
@@ -217,6 +220,10 @@ def start_impalad_instances(cluster_size, 
num_coordinators, use_exclusive_coordi
   # --impalad_args flag. virtual_memory().total returns the total physical 
memory.
   mem_limit = int(0.8 * psutil.virtual_memory().total / cluster_size)
 
+  delay_list = []
+  if options.catalog_init_delays != "":
+    delay_list = [delay.strip() for delay in 
options.catalog_init_delays.split(",")]
+
   # Start each impalad instance and optionally redirect the output to a log 
file.
   for i in range(cluster_size):
     if i == 0:
@@ -248,6 +255,9 @@ def start_impalad_instances(cluster_size, num_coordinators, 
use_exclusive_coordi
       # Coordinator instance that doesn't execute non-coordinator fragments
       args = "-is_executor=false %s" % (args)
 
+    if i < len(delay_list):
+      args = "-stress_catalog_init_delay_ms=%s %s" % (delay_list[i], args)
+
     stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % 
service_name)
     exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path)
 
@@ -281,37 +291,53 @@ def 
wait_for_cluster_web(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
   A cluster is deemed "ready" if:
     - All backends are registered with the statestore.
     - Each impalad knows about all other impalads.
+    - Each coordinator impalad's catalog cache is ready.
   This information is retrieved by querying the statestore debug webpage
   and each individual impalad's metrics webpage.
   """
   impala_cluster = ImpalaCluster()
   # impalad processes may take a while to come up.
   wait_for_impala_process_count(impala_cluster)
+
+  # TODO: fix this for coordinator-only nodes as well.
+  expected_num_backends = options.cluster_size
+  if options.catalog_init_delays != "":
+    for delay in options.catalog_init_delays.split(","):
+      if int(delay.strip()) != 0: expected_num_backends -= 1
+
   for impalad in impala_cluster.impalads:
-    impalad.service.wait_for_num_known_live_backends(options.cluster_size,
+    impalad.service.wait_for_num_known_live_backends(expected_num_backends,
         timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
-    if impalad._get_arg_value('is_coordinator', default='true') == 'true':
-      wait_for_catalog(impalad, 
timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS)
+    if impalad._get_arg_value('is_coordinator', default='true') == 'true' and \
+       impalad._get_arg_value('stress_catalog_init_delay_ms', default=0) == 0:
+      wait_for_catalog(impalad)
 
-def wait_for_catalog(impalad, timeout_in_seconds):
-  """Waits for the impalad catalog to become ready"""
+def wait_for_catalog(impalad, 
timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
+  """Waits for a catalog copy to be received by the impalad. When its received,
+     additionally waits for client ports to be opened."""
   start_time = time()
-  catalog_ready = False
-  attempt = 0
-  while (time() - start_time < timeout_in_seconds and not catalog_ready):
+  client_beeswax = None
+  client_hs2 = None
+  num_dbs = 0
+  num_tbls = 0
+  while (time() - start_time < timeout_in_seconds):
     try:
       num_dbs = impalad.service.get_metric_value('catalog.num-databases')
       num_tbls = impalad.service.get_metric_value('catalog.num-tables')
-      catalog_ready = impalad.service.get_metric_value('catalog.ready')
-      if catalog_ready or attempt % 4 == 0:
-          print 'Waiting for Catalog... Status: %s DBs / %s tables (ready=%s)' 
%\
-              (num_dbs, num_tbls, catalog_ready)
-      attempt += 1
-    except Exception, e:
-      print e
+      client_beeswax = impalad.service.create_beeswax_client()
+      client_hs2 = impalad.service.create_hs2_client()
+      break
+    except Exception as e:
+      print 'Client services not ready.'
+      print 'Waiting for catalog cache: (%s DBs / %s tables). Trying again 
...' %\
+        (num_dbs, num_tbls)
+    finally:
+      if client_beeswax is not None: client_beeswax.close()
     sleep(0.5)
-  if not catalog_ready:
-    raise RuntimeError('Catalog was not initialized in expected time period.')
+
+  if client_beeswax is None or client_hs2 is None:
+    raise RuntimeError('Unable to open client ports within %s seconds.'\
+                       % timeout_in_seconds)
 
 def 
wait_for_cluster_cmdline(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
   """Checks if the cluster is "ready" by executing a simple query in a loop"""
@@ -389,6 +415,8 @@ if __name__ == "__main__":
                             options.use_exclusive_coordinators)
     # Sleep briefly to reduce log spam: the cluster takes some time to start 
up.
     sleep(3)
+
+    # Check for the cluster to be ready.
     wait_for_cluster()
   except Exception, e:
     print 'Error starting cluster: %s' % e

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 63941c1..f9a29f4 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -856,6 +856,28 @@ public class Frontend {
   }
 
   /**
+   * Waits indefinitely for the local catalog to be ready. The catalog is 
"ready" after
+   * the first catalog update is received from the statestore.
+   *
+   * @see ImpaladCatalog.isReady
+   */
+  public void waitForCatalog() {
+    LOG.info("Waiting for first catalog update from the statestore.");
+    int numTries = 0;
+    long startTimeMs = System.currentTimeMillis();
+    while (true) {
+      if (getCatalog().isReady()) {
+        LOG.info("Local catalog initialized after: " +
+            (System.currentTimeMillis() - startTimeMs) + " ms.");
+        return;
+      }
+      LOG.info("Waiting for local catalog to be initialized, attempt: " + 
numTries);
+      getCatalog().waitForCatalogUpdate(MAX_CATALOG_UPDATE_WAIT_TIME_MS);
+      ++numTries;
+    }
+  }
+
+  /**
    * Overload of requestTblLoadAndWait that uses the default timeout.
    */
   public boolean requestTblLoadAndWait(Set<TableName> requestedTbls)
@@ -879,10 +901,8 @@ public class Frontend {
    */
   private AnalysisContext.AnalysisResult analyzeStmt(TQueryCtx queryCtx)
       throws AnalysisException, InternalException, AuthorizationException {
-    if (!impaladCatalog_.get().isReady()) {
-      throw new AnalysisException("This Impala daemon is not ready to accept 
user " +
-          "requests. Status: Waiting for catalog update from the StateStore.");
-    }
+    Preconditions.checkState(getCatalog().isReady(),
+        "Local catalog has not been initialized. Aborting query analysis.");
 
     AnalysisContext analysisCtx = new AnalysisContext(impaladCatalog_.get(), 
queryCtx,
         authzConfig_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/fe/src/main/java/org/apache/impala/service/JniFrontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java 
b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index cfd83a5..688bd0e 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -580,10 +580,12 @@ public class JniFrontend {
     }
   }
 
-  public void setCatalogInitialized() {
+  public void setCatalogIsReady() {
     frontend_.getCatalog().setIsReady(true);
   }
 
+  public void waitForCatalog() { frontend_.waitForCatalog(); }
+
   // Caching this saves ~50ms per call to getHadoopConfigAsHtml
   private static final Configuration CONF = new Configuration();
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/fe/src/test/java/org/apache/impala/service/FrontendTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/service/FrontendTest.java 
b/fe/src/test/java/org/apache/impala/service/FrontendTest.java
index deab9eb..c87882d 100644
--- a/fe/src/test/java/org/apache/impala/service/FrontendTest.java
+++ b/fe/src/test/java/org/apache/impala/service/FrontendTest.java
@@ -66,70 +66,6 @@ import com.google.common.collect.Sets;
 public class FrontendTest extends FrontendTestBase {
 
   @Test
-  public void TestCatalogReadiness() throws ImpalaException {
-    // Test different authorization configurations.
-    List<AuthorizationConfig> authzConfigs = Lists.newArrayList();
-    authzConfigs.add(AuthorizationConfig.createAuthDisabledConfig());
-    authzConfigs.add(AuthorizationTest.createPolicyFileAuthzConfig());
-    authzConfigs.add(AuthorizationTest.createSentryServiceAuthzConfig());
-    // Test the behavior with different stmt types.
-    List<String> testStmts = Lists.newArrayList();
-    testStmts.add("select * from functional.alltypesagg");
-    testStmts.add("select 1");
-    testStmts.add("show tables in tpch");
-    testStmts.add("create table tpch.ready_test (i int)");
-    testStmts.add("insert into functional.alltypes partition (year, month) " +
-        "select * from functional.alltypestiny");
-    for (AuthorizationConfig authzConfig: authzConfigs) {
-      ImpaladTestCatalog catalog = new ImpaladTestCatalog(authzConfig);
-      Frontend fe = new Frontend(authzConfig, catalog);
-
-      // When the catalog is ready, all stmts should pass analysis.
-      Preconditions.checkState(catalog.isReady());
-      for (String stmt: testStmts) testCatalogIsReady(stmt, fe);
-
-      // When the catalog is not ready, all stmts should fail analysis.
-      catalog.setIsReady(false);
-      for (String stmt: testStmts) testCatalogIsNotReady(stmt, fe);
-    }
-  }
-
-  /**
-   * Creates an exec request from 'stmt' using the given 'fe'.
-   * Expects that no exception is thrown.
-   */
-  private void testCatalogIsReady(String stmt, Frontend fe) {
-    System.out.println(stmt);
-    TQueryCtx queryCtx = TestUtils.createQueryContext(
-        Catalog.DEFAULT_DB, AuthorizationTest.USER.getName());
-    queryCtx.client_request.setStmt(stmt);
-    try {
-      fe.createExecRequest(queryCtx, new StringBuilder());
-    } catch (Exception e) {
-      fail("Failed to create exec request due to: " + 
ExceptionUtils.getStackTrace(e));
-    }
-  }
-
-  /**
-   * Creates an exec request from 'stmt' using the given 'fe'.
-   * Expects that the stmt fails to analyze because the catalog is not ready.
-   */
-  private void testCatalogIsNotReady(String stmt, Frontend fe) {
-    TQueryCtx queryCtx = TestUtils.createQueryContext(
-        Catalog.DEFAULT_DB, AuthorizationTest.USER.getName());
-    queryCtx.client_request.setStmt(stmt);
-    try {
-      fe.createExecRequest(queryCtx, new StringBuilder());
-      fail("Expected failure to due uninitialized catalog.");
-    } catch (AnalysisException e) {
-      assertEquals("This Impala daemon is not ready to accept user requests. " 
+
-          "Status: Waiting for catalog update from the StateStore.", 
e.getMessage());
-    } catch (Exception e) {
-      fail("Failed to create exec request due to: " + 
ExceptionUtils.getStackTrace(e));
-    }
-  }
-
-  @Test
   public void TestGetTypeInfo() throws ImpalaException {
     // Verify that the correct number of types are returned.
     TMetadataOpRequest getInfoReq = new TMetadataOpRequest();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py 
b/tests/common/custom_cluster_test_suite.py
index a48fd0d..c264d0e 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -31,12 +31,13 @@ from time import sleep
 IMPALA_HOME = os.environ['IMPALA_HOME']
 CLUSTER_SIZE = 3
 NUM_COORDINATORS = CLUSTER_SIZE
-# The number of statestore subscribers is CLUSTER_SIZE (# of impalad) + 1 (for 
catalogd).
-NUM_SUBSCRIBERS = CLUSTER_SIZE + 1
 
+# Additional args passed to respective daemon command line.
 IMPALAD_ARGS = 'impalad_args'
 STATESTORED_ARGS = 'state_store_args'
 CATALOGD_ARGS = 'catalogd_args'
+# Additional args passed to the start-impala-cluster script.
+START_ARGS = 'start_args'
 
 class CustomClusterTestSuite(ImpalaTestSuite):
   """Every test in a test suite deriving from this class gets its own Impala 
cluster.
@@ -81,7 +82,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     pass
 
   @staticmethod
-  def with_args(impalad_args=None, statestored_args=None, catalogd_args=None):
+  def with_args(impalad_args=None, statestored_args=None, catalogd_args=None, 
start_args=None):
     """Records arguments to be passed to a cluster by adding them to the 
decorated
     method's func_dict"""
     def decorate(func):
@@ -91,6 +92,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
         func.func_dict[STATESTORED_ARGS] = statestored_args
       if catalogd_args is not None:
         func.func_dict[CATALOGD_ARGS] = catalogd_args
+      if start_args is not None:
+        func.func_dict[START_ARGS] = start_args
       return func
     return decorate
 
@@ -99,6 +102,9 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     for arg in [IMPALAD_ARGS, STATESTORED_ARGS, CATALOGD_ARGS]:
       if arg in method.func_dict:
         cluster_args.append("--%s=\"%s\" " % (arg, method.func_dict[arg]))
+    if START_ARGS in method.func_dict:
+      cluster_args.append(method.func_dict[START_ARGS])
+
     # Start a clean new cluster before each test
     self._start_impala_cluster(cluster_args)
     super(CustomClusterTestSuite, self).setup_class()
@@ -117,7 +123,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
   @classmethod
   def _start_impala_cluster(cls, options, log_dir=os.getenv('LOG_DIR', 
"/tmp/"),
       cluster_size=CLUSTER_SIZE, num_coordinators=NUM_COORDINATORS,
-      use_exclusive_coordinators=False, log_level=1):
+      use_exclusive_coordinators=False, log_level=1, 
expected_num_executors=CLUSTER_SIZE):
     cls.impala_log_dir = log_dir
     cmd = [os.path.join(IMPALA_HOME, 'bin/start-impala-cluster.py'),
            '--cluster_size=%d' % cluster_size,
@@ -133,9 +139,14 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     statestored = cls.cluster.statestored
     if statestored is None:
       raise Exception("statestored was not found")
-    statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS, timeout=60)
+
+    # The number of statestore subscribers is
+    # cluster_size (# of impalad) + 1 (for catalogd).
+    expected_subscribers = cluster_size + 1
+
+    statestored.service.wait_for_live_subscribers(expected_subscribers, 
timeout=60)
     for impalad in cls.cluster.impalads:
-      impalad.service.wait_for_num_known_live_backends(CLUSTER_SIZE, 
timeout=60)
+      impalad.service.wait_for_num_known_live_backends(expected_num_executors, 
timeout=60)
 
   def assert_impalad_log_contains(self, level, line_regex, expected_count=1):
     """

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/tests/common/impala_cluster.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index d27b89a..3fbcacf 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -169,6 +169,7 @@ class Process(object):
       assert 0, "No processes %s found" % self.cmd
     LOG.info('Killing: %s (PID: %d) with signal %s'  % (' '.join(self.cmd), 
pid, signal))
     exec_process("kill -%d %d" % (signal, pid))
+
     return pid
 
   def restart(self):
@@ -222,8 +223,9 @@ class ImpaladProcess(BaseImpalaProcess):
   def start(self, wait_until_ready=True):
     """Starts the impalad and waits until the service is ready to accept 
connections."""
     super(ImpaladProcess, self).start()
-    self.service.wait_for_metric_value('impala-server.ready',
-        expected_value=1, timeout=30)
+    if wait_until_ready:
+      self.service.wait_for_metric_value('impala-server.ready',
+                                         expected_value=1, timeout=30)
 
 
 # Represents a statestored process

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/tests/common/impala_service.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index 3fb73bc..be33328 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -78,7 +78,7 @@ class BaseImpalaService(object):
         LOG.error(e)
 
       if value == expected_value:
-        LOG.info("Metric '%s' has reach desired value: %s" % (metric_name, 
value))
+        LOG.info("Metric '%s' has reached desired value: %s" % (metric_name, 
value))
         return value
       else:
         LOG.info("Waiting for metric value '%s'=%s. Current value: %s" %

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/tests/custom_cluster/test_catalog_wait.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_catalog_wait.py 
b/tests/custom_cluster/test_catalog_wait.py
new file mode 100644
index 0000000..307e8a9
--- /dev/null
+++ b/tests/custom_cluster/test_catalog_wait.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from time import sleep
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+class TestCatalogWait(CustomClusterTestSuite):
+  """Impalad coordinators must wait for their local replica of the catalog to 
be
+     initialized from the statestore prior to opening up client ports.
+     This test simulates a failed or slow catalog on impalad startup."""
+
+  def expect_connection(self, impalad):
+    impalad.service.create_beeswax_client()
+    impalad.service.create_hs2_client()
+
+  def expect_no_connection(self, impalad):
+    with pytest.raises(Exception) as e:
+      impalad.service.create_beeswax_client()
+      assert 'Could not connect to' in str(e.value)
+
+    with pytest.raises(Exception) as e:
+      impalad.service.create_hs2_client()
+      assert 'Could not connect to' in str(e.value)
+
+  @pytest.mark.execute_serially
+  def test_delayed_catalog(self):
+    """ Tests client interactions with the cluster when one of the daemons,
+        impalad[2], is delayed in initializing its local catalog replica."""
+
+    # On startup, expect only two executors to be registered.
+    self._start_impala_cluster(["--catalog_init_delays=0,0,200000"],
+                               expected_num_executors=2)
+
+    # Expect that impalad[2] is not ready.
+    
self.cluster.impalads[2].service.wait_for_metric_value('impala-server.ready', 
0);
+
+    # Expect that impalad[0,1] are both ready and with initialized catalog.
+    
self.cluster.impalads[0].service.wait_for_metric_value('impala-server.ready', 
1);
+    self.cluster.impalads[0].service.wait_for_metric_value('catalog.ready', 1);
+    
self.cluster.impalads[1].service.wait_for_metric_value('impala-server.ready', 
1);
+    self.cluster.impalads[1].service.wait_for_metric_value('catalog.ready', 1);
+
+    # Expect that connections can be made to impalads[0,1], but not to 
impalads[2].
+    self.expect_connection(self.cluster.impalads[0])
+    self.expect_connection(self.cluster.impalads[1])
+    self.expect_no_connection(self.cluster.impalads[2])
+
+    # Issues a query to check that impalad[2] does not evaluate any fragments
+    # and does not prematurely register itself as an executor. The former is
+    # verified via query fragment metrics and the latter would fail if 
registered
+    # but unable to process fragments.
+    client0 = self.cluster.impalads[0].service.create_beeswax_client()
+    client1 = self.cluster.impalads[1].service.create_beeswax_client()
+
+    self.execute_query_expect_success(client0, "select * from 
functional.alltypes");
+    self.execute_query_expect_success(client1, "select * from 
functional.alltypes");
+
+    # Check that fragments were run on impalad[0,1] and none on impalad[2].
+    # Each ready impalad runs a fragment per query and one coordinator 
fragment. With
+    # two queries, one coordinated per ready impalad, that should be 3 total 
fragments.
+    
self.cluster.impalads[0].service.wait_for_metric_value('impala-server.num-fragments',
 3);
+    
self.cluster.impalads[1].service.wait_for_metric_value('impala-server.num-fragments',
 3);
+    
self.cluster.impalads[2].service.wait_for_metric_value('impala-server.num-fragments',
 0);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/tests/custom_cluster/test_coordinators.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_coordinators.py 
b/tests/custom_cluster/test_coordinators.py
index 4d0b814..eb5a125 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -93,6 +93,7 @@ class TestCoordinators(CustomClusterTestSuite):
       """Connects to the coordinator node, runs a query and verifies that 
certain
         operators are executed on 'expected_num_of_executors' nodes."""
       coordinator = self.cluster.impalads[0]
+      client = None
       try:
         client = coordinator.service.create_beeswax_client()
         assert client is not None
@@ -106,6 +107,7 @@ class TestCoordinators(CustomClusterTestSuite):
           elif rows['operator'] == '01:AGGREGATE':
             assert rows['num_hosts'] == expected_num_of_executors
       finally:
+        assert client is not None
         client.close()
 
     # Cluster config where the coordinator can execute query fragments

Reply via email to