IMPALA-1760: Implement shutdown command

This is the same patch except with fixes for the test failures
on EC and S3 noted in the JIRA.

This allows graceful shutdown of executors and partially graceful
shutdown of coordinators (new operations fail, old operations can
continue).

Details:
* In order to allow future admin commands, this is implemented with
  function-like syntax and does not add any reserved words.
* ALL privilege is required on the server
* The coordinator impalad that the client is connected to can be shut
  down directly with ":shutdown()".
* Remote shutdown of another impalad is supported, e.g. with
  ":shutdown('hostname')", so that non-coordinators can be shut down
  and for the convenience of the client, which does not have to
  connect to the specific impalad. There is no assumption that the
  other impalad is registered in the statestore; just that the
  coordinator can connect to the other daemon's thrift endpoint.
  This simplifies things and allows shutdown in various important
  cases, e.g. statestore down.
* The shutdown time limit can be overridden to force a quicker or
  slower shutdown by specifying a deadline in seconds after the
  statement is executed.
* If shutting down, a banner is shown on the root debug page.

Workflow:
1. (if a coordinator) clients are prevented from submitting
  queries to this coordinator via some out-of-band mechanism,
  e.g. load balancer
2. the shutdown process is started via ":shutdown()"
3. a bit is set in the statestore and propagated to coordinators,
  which stop scheduling fragment instances on this daemon
  (if an executor).
4. the query startup grace period (which is ideally set to the AC
  queueing delay plus some additional leeway) expires
5. once the daemon is quiesced (i.e. no fragments, no registered
  queries), it shuts itself down.
6. If the daemon does not successfully quiesce (e.g. rogue clients,
  long-running queries), after a longer timeout (counted from the start
  of the shutdown process) it will shut down anyway.

What this does:
* Executors can be shut down without causing a service-wide outage
* Shutting down an executor will not disrupt any short-running queries
  and will wait for long-running queries up to a threshold.
* Coordinators can be shut down without query failures only if
  there is an out-of-band mechanism to prevent submission of more
  queries to the shut down coordinator. If queries are submitted to
  a coordinator after shutdown has started, they will fail.
* Long running queries or other issues (e.g. stuck fragments) will
  slow down but not prevent eventual shutdown.

Limitations:
* The startup grace period needs to be configured to be greater than
  the latency of statestore updates + scheduling + admission +
  coordinator startup. Otherwise a coordinator may send a
  fragment instance to the shutting down impalad. (We could
  automate this configuration as a follow-on)
* The startup grace period means a minimum latency for shutdown,
  even if the cluster is idle.
* We depend on the statestore detecting the process going down
  if queries are still running on that backend when the timeout
  expires. This may still be subject to existing problems,
  e.g. IMPALA-2990.

Tests:
* Added parser, analysis and authorization tests.
* End-to-end test of shutting down impalads.
* End-to-end test of shutting down then restarting an executor while
  queries are running.
* End-to-end test of shutting down a coordinator
  - New queries cannot be started on coord, existing queries continue to run
  - Exercises various Beeswax and HS2 operations.

Change-Id: I8f3679ef442745a60a0ab97c4e9eac437aef9463
Reviewed-on: http://gerrit.cloudera.org:8080/11484
Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/f46de211
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/f46de211
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/f46de211

Branch: refs/heads/master
Commit: f46de21140f3bb483884fc49f5ded7afc466faac
Parents: 48640b5
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Thu May 17 11:30:25 2018 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Committed: Wed Sep 26 01:28:36 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/backend-client.h                 |   8 +
 be/src/runtime/client-cache.h                   |  42 ++-
 be/src/runtime/coordinator-backend-state.cc     |  55 ++--
 be/src/scheduling/scheduler.cc                  |   9 +-
 be/src/service/client-request-state.cc          |  43 +++
 be/src/service/client-request-state.h           |   6 +-
 be/src/service/impala-beeswax-server.cc         |   4 +
 be/src/service/impala-hs2-server.cc             |  10 +
 be/src/service/impala-http-handler.cc           |   1 +
 be/src/service/impala-internal-service.cc       |   8 +
 be/src/service/impala-internal-service.h        |   2 +
 be/src/service/impala-server.cc                 | 128 +++++++-
 be/src/service/impala-server.h                  |  80 ++++-
 be/src/testutil/fault-injection-util.h          |   1 +
 be/src/util/default-path-handlers.cc            |  13 +-
 common/thrift/Frontend.thrift                   |  28 ++
 common/thrift/ImpalaInternalService.thrift      |  35 +++
 common/thrift/StatestoreService.thrift          |   4 +
 common/thrift/Types.thrift                      |   3 +-
 common/thrift/generate_error_codes.py           |   2 +
 fe/src/main/cup/sql-parser.cup                  |  13 +
 .../org/apache/impala/analysis/AdminFnStmt.java | 165 +++++++++++
 .../apache/impala/analysis/AnalysisContext.java |   6 +
 .../java/org/apache/impala/analysis/Expr.java   |  62 ++++
 .../apache/impala/analysis/LimitElement.java    |  56 +---
 .../org/apache/impala/service/Frontend.java     |   8 +
 .../impala/analysis/AnalyzeStmtsTest.java       |  50 ++++
 .../impala/analysis/AuthorizationStmtTest.java  |  13 +
 .../org/apache/impala/analysis/ParserTest.java  |  35 +++
 .../org/apache/impala/analysis/ToSqlTest.java   |  32 +-
 tests/common/impala_cluster.py                  |  13 +-
 tests/common/impala_service.py                  |  20 +-
 tests/custom_cluster/test_restart_services.py   | 296 ++++++++++++++++++-
 tests/hs2/hs2_test_suite.py                     |  14 +-
 tests/hs2/test_fetch_first.py                   |   2 +
 www/backends.tmpl                               |   2 +
 www/root.tmpl                                   |  10 +-
 37 files changed, 1140 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/be/src/runtime/backend-client.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h
index 0e35999..a0f27c7 100644
--- a/be/src/runtime/backend-client.h
+++ b/be/src/runtime/backend-client.h
@@ -104,6 +104,14 @@ class ImpalaBackendClient : public 
ImpalaInternalServiceClient {
     ImpalaInternalServiceClient::recv_PublishFilter(_return);
   }
 
+  void RemoteShutdown(TRemoteShutdownResult& _return, const 
TRemoteShutdownParams& params,
+      bool* send_done) {
+    DCHECK(!*send_done);
+    ImpalaInternalServiceClient::send_RemoteShutdown(params);
+    *send_done = true;
+    ImpalaInternalServiceClient::recv_RemoteShutdown(_return);
+  }
+
 #pragma clang diagnostic pop
 
  private:

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/be/src/runtime/client-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/client-cache.h b/be/src/runtime/client-cache.h
index 500ad59..21b323d 100644
--- a/be/src/runtime/client-cache.h
+++ b/be/src/runtime/client-cache.h
@@ -27,10 +27,11 @@
 #include <gutil/strings/substitute.h>
 
 #include "catalog/catalog-service-client-wrapper.h"
-#include "runtime/client-cache-types.h"
-#include "util/metrics.h"
 #include "rpc/thrift-client.h"
 #include "rpc/thrift-util.h"
+#include "runtime/client-cache-types.h"
+#include "util/debug-util.h"
+#include "util/metrics.h"
 
 #include "common/status.h"
 
@@ -261,6 +262,43 @@ class ClientConnection {
     return Status::OK();
   }
 
+  /// Return struct for DoRpcWithRetry() that allows callers to distinguish 
between
+  /// failures in getting a client and failures sending the RPC.
+  struct RpcStatus {
+    Status status;
+
+    // Set to true if 'status' is not OK and the error occurred while getting 
the client.
+    bool client_error;
+
+    static RpcStatus OK() { return {Status::OK(), false}; }
+  };
+
+  /// Helper that retries constructing a client and calling DoRpc() up the 
three times
+  /// and handles both RPC failures and failures to get a client from 
'client_cache'.
+  /// 'debug_fn' is a Status-returning function that can be used to inject 
errors into
+  /// the RPC.
+  template <class F, class DebugF, class Request, class Response>
+  static RpcStatus DoRpcWithRetry(ClientCache<T>* client_cache, 
TNetworkAddress address,
+      const F& f, const Request& request, const DebugF& debug_fn, Response* 
response) {
+    Status rpc_status;
+    Status client_status;
+
+    // Try to send the RPC 3 times before failing.
+    for (int i = 0; i < 3; ++i) {
+      ImpalaBackendConnection client(client_cache, address, &client_status);
+      if (!client_status.ok()) continue;
+
+      rpc_status = debug_fn();
+      if (!rpc_status.ok()) continue;
+
+      rpc_status = client.DoRpc(f, request, response);
+      if (rpc_status.ok()) break;
+    }
+    if (!client_status.ok()) return {client_status, true};
+    if (!rpc_status.ok()) return {rpc_status, false};
+    return RpcStatus::OK();
+  }
+
   /// In certain cases, the server may take longer to provide an RPC response 
than
   /// the configured socket timeout. Callers may wish to retry receiving the 
response.
   /// This is safe if and only if DoRpc() returned RPC_RECV_TIMEOUT.

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc 
b/be/src/runtime/coordinator-backend-state.cc
index 2d97b54..95484a4 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -377,38 +377,29 @@ bool Coordinator::BackendState::Cancel() {
   VLOG_QUERY << "sending CancelQueryFInstances rpc for query_id=" << 
PrintId(query_id()) <<
       " backend=" << TNetworkAddressToString(impalad_address());
 
-  Status rpc_status;
-  Status client_status;
-  // Try to send the RPC 3 times before failing.
-  for (int i = 0; i < 3; ++i) {
-    ImpalaBackendConnection 
backend_client(ExecEnv::GetInstance()->impalad_client_cache(),
-        impalad_address(), &client_status);
-    if (!client_status.ok()) continue;
-
-    rpc_status = DebugAction(query_ctx().client_request.query_options,
-        "COORD_CANCEL_QUERY_FINSTANCES_RPC");
-    if (!rpc_status.ok()) continue;
-
-    // The return value 'dummy' is ignored as it's only set if the fragment
-    // instance cannot be found in the backend. The fragment instances of a 
query
-    // can all be cancelled locally in a backend due to RPC failure to
-    // coordinator. In which case, the query state can be gone already.
-    rpc_status = backend_client.DoRpc(
-        &ImpalaBackendClient::CancelQueryFInstances, params, &dummy);
-    if (rpc_status.ok()) break;
-  }
-  if (!client_status.ok()) {
-    status_.MergeStatus(client_status);
-    VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id())
-               << " failed to connect to " << 
TNetworkAddressToString(impalad_address())
-               << " :" << client_status.msg().msg();
-    return true;
-  }
-  if (!rpc_status.ok()) {
-    status_.MergeStatus(rpc_status);
-    VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id())
-               << " rpc to " << TNetworkAddressToString(impalad_address())
-               << " failed: " << rpc_status.msg().msg();
+
+  // The return value 'dummy' is ignored as it's only set if the fragment
+  // instance cannot be found in the backend. The fragment instances of a query
+  // can all be cancelled locally in a backend due to RPC failure to
+  // coordinator. In which case, the query state can be gone already.
+  ImpalaBackendConnection::RpcStatus rpc_status = 
ImpalaBackendConnection::DoRpcWithRetry(
+      ExecEnv::GetInstance()->impalad_client_cache(), impalad_address(),
+      &ImpalaBackendClient::CancelQueryFInstances, params,
+      [this] () {
+        return DebugAction(query_ctx().client_request.query_options,
+            "COORD_CANCEL_QUERY_FINSTANCES_RPC");
+      }, &dummy);
+  if (!rpc_status.status.ok()) {
+    status_.MergeStatus(rpc_status.status);
+    if (rpc_status.client_error) {
+      VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id())
+                 << " failed to connect to " << 
TNetworkAddressToString(impalad_address())
+                 << " :" << rpc_status.status.msg().msg();
+    } else {
+      VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id())
+                 << " rpc to " << TNetworkAddressToString(impalad_address())
+                 << " failed: " << rpc_status.status.msg().msg();
+    }
     return true;
   }
   return true;

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index ae4f049..43bf199 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -190,7 +190,14 @@ void Scheduler::UpdateMembership(
            << TNetworkAddressToString(local_backend_descriptor_.address) << 
")";
       continue;
     }
-    if (be_desc.is_executor) {
+    if (be_desc.is_quiescing) {
+      // Make sure backends that are shutting down are not scheduled on.
+      auto it = current_executors_.find(item.key);
+      if (it != current_executors_.end()) {
+        new_executors_config->RemoveBackend(it->second);
+        current_executors_.erase(it);
+      }
+    } else if (be_desc.is_executor) {
       new_executors_config->AddBackend(be_desc);
       current_executors_.insert(make_pair(item.key, be_desc));
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index 4880ac6..f59d65b 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -21,6 +21,7 @@
 #include <limits>
 #include <gutil/strings/substitute.h>
 
+#include "runtime/backend-client.h"
 #include "runtime/coordinator.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
@@ -51,6 +52,7 @@ using namespace apache::thrift;
 using namespace beeswax;
 using namespace strings;
 
+DECLARE_int32(be_port);
 DECLARE_int32(catalog_service_port);
 DECLARE_string(catalog_service_host);
 DECLARE_int64(max_result_cache_size);
@@ -230,6 +232,9 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) 
{
       }
       break;
     }
+    case TStmtType::ADMIN_FN:
+      DCHECK(exec_request_.admin_request.type == TAdminRequestType::SHUTDOWN);
+      return ExecShutdownRequest();
     default:
       stringstream errmsg;
       errmsg << "Unknown exec request stmt type: " << exec_request_.stmt_type;
@@ -613,6 +618,44 @@ Status ClientRequestState::ExecDdlRequest() {
   return Status::OK();
 }
 
+Status ClientRequestState::ExecShutdownRequest() {
+  const TShutdownParams& request = exec_request_.admin_request.shutdown_params;
+  int port = request.__isset.backend && request.backend.port != 0 ? 
request.backend.port :
+                                                                    
FLAGS_be_port;
+  // Use the local shutdown code path if the host is unspecified or if it 
exactly matches
+  // the configured host/port. This avoids the possibility of RPC errors 
preventing
+  // shutdown.
+  if (!request.__isset.backend
+      || (request.backend.hostname == FLAGS_hostname && port == 
FLAGS_be_port)) {
+    TShutdownStatus shutdown_status;
+    int64_t deadline_s = request.__isset.deadline_s ? request.deadline_s : -1;
+    RETURN_IF_ERROR(parent_server_->StartShutdown(deadline_s, 
&shutdown_status));
+    SetResultSet({ImpalaServer::ShutdownStatusToString(shutdown_status)});
+    return Status::OK();
+  }
+  TNetworkAddress addr = MakeNetworkAddress(request.backend.hostname, port);
+
+  TRemoteShutdownParams params;
+  if (request.__isset.deadline_s) params.__set_deadline_s(request.deadline_s);
+  TRemoteShutdownResult resp;
+  VLOG_QUERY << "Sending Shutdown RPC to " << TNetworkAddressToString(addr);
+  ImpalaBackendConnection::RpcStatus rpc_status = 
ImpalaBackendConnection::DoRpcWithRetry(
+      ExecEnv::GetInstance()->impalad_client_cache(), addr,
+      &ImpalaBackendClient::RemoteShutdown, params,
+      [this]() { return DebugAction(query_options(), "CRS_SHUTDOWN_RPC"); }, 
&resp);
+  if (!rpc_status.status.ok()) {
+    VLOG_QUERY << "RemoteShutdown query_id= " << PrintId(query_id())
+               << " failed to send RPC to " << TNetworkAddressToString(addr) 
<< " :"
+               << rpc_status.status.msg().msg();
+    return rpc_status.status;
+  }
+
+  Status shutdown_status(resp.status);
+  RETURN_IF_ERROR(shutdown_status);
+  SetResultSet({ImpalaServer::ShutdownStatusToString(resp.shutdown_status)});
+  return Status::OK();
+}
+
 void ClientRequestState::Done() {
   MarkActive();
   // Make sure we join on wait_thread_ before we finish (and especially before 
this object

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h 
b/be/src/service/client-request-state.h
index 0f6f514..d28ca3f 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -406,8 +406,6 @@ class ClientRequestState {
   bool user_has_profile_access_ = true;
 
   TResultSetMetadata result_metadata_; // metadata for select query
-  RowBatch* current_batch_ = nullptr; // the current row batch; only 
applicable if coord is set
-  int current_batch_row_ = 0 ; // number of rows fetched within the current 
batch
   int num_rows_fetched_ = 0; // number of rows fetched by client for the 
entire query
 
   /// True if a fetch was attempted by a client, regardless of whether a 
result set
@@ -456,8 +454,8 @@ class ClientRequestState {
   /// queries (e.g., compute stats) or dml (e.g., create table as select)
   Status ExecDdlRequest() WARN_UNUSED_RESULT;
 
-  /// Executes a LOAD DATA
-  Status ExecLoadDataRequest() WARN_UNUSED_RESULT;
+  /// Executes a shut down request.
+  Status ExecShutdownRequest() WARN_UNUSED_RESULT;
 
   /// Core logic of Wait(). Does not update operation_state_/query_status_.
   Status WaitInternal() WARN_UNUSED_RESULT;

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc 
b/be/src/service/impala-beeswax-server.cc
index 3ecbe80..c3570b2 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -52,6 +52,8 @@ namespace impala {
 
 void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
   VLOG_QUERY << "query(): query=" << query.query;
+  RAISE_IF_ERROR(CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR);
+
   ScopedSessionState session_handle(this);
   shared_ptr<SessionState> session;
   RAISE_IF_ERROR(
@@ -87,6 +89,7 @@ void ImpalaServer::query(QueryHandle& query_handle, const 
Query& query) {
 void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& 
query,
     const LogContextId& client_ctx) {
   VLOG_QUERY << "executeAndWait(): query=" << query.query;
+  RAISE_IF_ERROR(CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR);
   ScopedSessionState session_handle(this);
   shared_ptr<SessionState> session;
   RAISE_IF_ERROR(
@@ -140,6 +143,7 @@ void ImpalaServer::explain(QueryExplanation& 
query_explanation, const Query& que
   // Translate Beeswax Query to Impala's QueryRequest and then set the explain 
plan bool
   // before shipping to FE
   VLOG_QUERY << "explain(): query=" << query.query;
+  RAISE_IF_ERROR(CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR);
   ScopedSessionState session_handle(this);
   
RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc 
b/be/src/service/impala-hs2-server.cc
index 353c190..881b19a 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -266,6 +266,7 @@ Status ImpalaServer::TExecuteStatementReqToTQueryContext(
 // HiveServer2 API
 void ImpalaServer::OpenSession(TOpenSessionResp& return_val,
     const TOpenSessionReq& request) {
+  HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), 
SQLSTATE_GENERAL_ERROR);
 
   // Generate session ID and the secret
   TUniqueId session_id;
@@ -384,6 +385,7 @@ void ImpalaServer::CloseSession(TCloseSessionResp& 
return_val,
 void ImpalaServer::GetInfo(TGetInfoResp& return_val,
     const TGetInfoReq& request) {
   VLOG_QUERY << "GetInfo(): request=" << ThriftDebugString(request);
+  HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), 
SQLSTATE_GENERAL_ERROR);
 
   TUniqueId session_id;
   TUniqueId secret;
@@ -412,6 +414,7 @@ void ImpalaServer::GetInfo(TGetInfoResp& return_val,
 void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
     const TExecuteStatementReq& request) {
   VLOG_QUERY << "ExecuteStatement(): request=" << ThriftDebugString(request);
+  HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), 
SQLSTATE_GENERAL_ERROR);
   // We ignore the runAsync flag here: Impala's queries will always run 
asynchronously,
   // and will block on fetch. To the client, this looks like Hive's 
synchronous mode; the
   // difference is that rows are not available when ExecuteStatement() returns.
@@ -490,6 +493,7 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& 
return_val,
 void ImpalaServer::GetTypeInfo(TGetTypeInfoResp& return_val,
     const TGetTypeInfoReq& request) {
   VLOG_QUERY << "GetTypeInfo(): request=" << ThriftDebugString(request);
+  HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), 
SQLSTATE_GENERAL_ERROR);
 
   TMetadataOpRequest req;
   req.__set_opcode(TMetadataOpcode::GET_TYPE_INFO);
@@ -508,6 +512,7 @@ void ImpalaServer::GetTypeInfo(TGetTypeInfoResp& return_val,
 void ImpalaServer::GetCatalogs(TGetCatalogsResp& return_val,
     const TGetCatalogsReq& request) {
   VLOG_QUERY << "GetCatalogs(): request=" << ThriftDebugString(request);
+  HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), 
SQLSTATE_GENERAL_ERROR);
 
   TMetadataOpRequest req;
   req.__set_opcode(TMetadataOpcode::GET_CATALOGS);
@@ -526,6 +531,7 @@ void ImpalaServer::GetCatalogs(TGetCatalogsResp& return_val,
 void ImpalaServer::GetSchemas(TGetSchemasResp& return_val,
     const TGetSchemasReq& request) {
   VLOG_QUERY << "GetSchemas(): request=" << ThriftDebugString(request);
+  HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), 
SQLSTATE_GENERAL_ERROR);
 
   TMetadataOpRequest req;
   req.__set_opcode(TMetadataOpcode::GET_SCHEMAS);
@@ -544,6 +550,7 @@ void ImpalaServer::GetSchemas(TGetSchemasResp& return_val,
 void ImpalaServer::GetTables(TGetTablesResp& return_val,
     const TGetTablesReq& request) {
   VLOG_QUERY << "GetTables(): request=" << ThriftDebugString(request);
+  HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), 
SQLSTATE_GENERAL_ERROR);
 
   TMetadataOpRequest req;
   req.__set_opcode(TMetadataOpcode::GET_TABLES);
@@ -562,6 +569,7 @@ void ImpalaServer::GetTables(TGetTablesResp& return_val,
 void ImpalaServer::GetTableTypes(TGetTableTypesResp& return_val,
     const TGetTableTypesReq& request) {
   VLOG_QUERY << "GetTableTypes(): request=" << ThriftDebugString(request);
+  HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), 
SQLSTATE_GENERAL_ERROR);
 
   TMetadataOpRequest req;
   req.__set_opcode(TMetadataOpcode::GET_TABLE_TYPES);
@@ -581,6 +589,7 @@ void ImpalaServer::GetTableTypes(TGetTableTypesResp& 
return_val,
 void ImpalaServer::GetColumns(TGetColumnsResp& return_val,
     const TGetColumnsReq& request) {
   VLOG_QUERY << "GetColumns(): request=" << ThriftDebugString(request);
+  HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), 
SQLSTATE_GENERAL_ERROR);
 
   TMetadataOpRequest req;
   req.__set_opcode(TMetadataOpcode::GET_COLUMNS);
@@ -599,6 +608,7 @@ void ImpalaServer::GetColumns(TGetColumnsResp& return_val,
 void ImpalaServer::GetFunctions(TGetFunctionsResp& return_val,
     const TGetFunctionsReq& request) {
   VLOG_QUERY << "GetFunctions(): request=" << ThriftDebugString(request);
+  HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), 
SQLSTATE_GENERAL_ERROR);
 
   TMetadataOpRequest req;
   req.__set_opcode(TMetadataOpcode::GET_FUNCTIONS);

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc 
b/be/src/service/impala-http-handler.cc
index 05d3c58..5435e7a 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -844,6 +844,7 @@ void ImpalaHttpHandler::BackendsHandler(const 
Webserver::ArgumentMap& args,
     backend_obj.AddMember("is_coordinator", backend.is_coordinator,
         document->GetAllocator());
     backend_obj.AddMember("is_executor", backend.is_executor, 
document->GetAllocator());
+    backend_obj.AddMember("is_quiescing", backend.is_quiescing, 
document->GetAllocator());
     backends_list.PushBack(backend_obj, document->GetAllocator());
   }
   document->AddMember("backends", backends_list, document->GetAllocator());

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/be/src/service/impala-internal-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.cc 
b/be/src/service/impala-internal-service.cc
index 864a1da..3e7b222 100644
--- a/be/src/service/impala-internal-service.cc
+++ b/be/src/service/impala-internal-service.cc
@@ -107,3 +107,11 @@ void 
ImpalaInternalService::PublishFilter(TPublishFilterResult& return_val,
   if (qs.get() == nullptr) return;
   qs->PublishFilter(params);
 }
+
+void ImpalaInternalService::RemoteShutdown(TRemoteShutdownResult& return_val,
+    const TRemoteShutdownParams& params) {
+  FAULT_INJECTION_RPC_DELAY(RPC_REMOTESHUTDOWN);
+  Status status = impala_server_->StartShutdown(
+      params.__isset.deadline_s ? params.deadline_s : -1, 
&return_val.shutdown_status);
+  status.ToThrift(&return_val.status);
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/be/src/service/impala-internal-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.h 
b/be/src/service/impala-internal-service.h
index 8d5ddd5..971670a 100644
--- a/be/src/service/impala-internal-service.h
+++ b/be/src/service/impala-internal-service.h
@@ -41,6 +41,8 @@ class ImpalaInternalService : public ImpalaInternalServiceIf {
       const TUpdateFilterParams& params);
   virtual void PublishFilter(TPublishFilterResult& return_val,
       const TPublishFilterParams& params);
+  virtual void RemoteShutdown(TRemoteShutdownResult& return_val,
+      const TRemoteShutdownParams& params);
 
  private:
   ImpalaServer* impala_server_;

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 091e00f..c46bebe 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -210,6 +210,19 @@ DEFINE_bool(is_coordinator, true, "If true, this Impala 
daemon can accept and co
 DEFINE_bool(is_executor, true, "If true, this Impala daemon will execute query 
"
     "fragments.");
 
+// TODO: can we automatically choose a startup grace period based on the max 
admission
+// control queue timeout + some margin for error?
+DEFINE_int64(shutdown_grace_period_s, 120, "Shutdown startup grace period in 
seconds. "
+    "When the shutdown process is started for this daemon, it will wait for at 
least the "
+    "startup grace period before shutting down. This gives time for updated 
cluster "
+    "membership information to propagate to all coordinators and for fragment 
instances "
+    "that were scheduled based on old cluster membership to start executing 
(and "
+    "therefore be reflected in the metrics used to detect quiescence).");
+
+DEFINE_int64(shutdown_deadline_s, 60 * 60, "Default time limit in seconds for 
the shut "
+    "down process. If this duration elapses after the shut down process is 
started, "
+    "the daemon shuts down regardless of any running queries.");
+
 #ifndef NDEBUG
   DEFINE_int64(stress_metadata_loading_pause_injection_ms, 0, "Simulates 
metadata loading"
       "for a given query by injecting a sleep equivalent to this configuration 
in "
@@ -637,6 +650,7 @@ void ImpalaServer::LogQueryEvents(const ClientRequestState& 
request_state) {
     case TStmtType::EXPLAIN:
     case TStmtType::LOAD:
     case TStmtType::SET:
+    case TStmtType::ADMIN_FN:
     default:
       break;
   }
@@ -1663,8 +1677,13 @@ void ImpalaServer::MembershipCallback(
         VLOG(2) << "Error deserializing topic item with key: " << item.key;
         continue;
       }
-      // This is a new item - add it to the map of known backends.
-      known_backends_.insert(make_pair(item.key, backend_descriptor));
+      // This is a new or modified item - add it to the map of known backends.
+      auto it = known_backends_.find(item.key);
+      if (it != known_backends_.end()) {
+        it->second = backend_descriptor;
+      } else {
+        known_backends_.emplace_hint(it, item.key, backend_descriptor);
+      }
     }
 
     // Register the local backend in the statestore and update the list of 
known backends.
@@ -1754,7 +1773,14 @@ void ImpalaServer::MembershipCallback(
 void ImpalaServer::AddLocalBackendToStatestore(
     vector<TTopicDelta>* subscriber_topic_updates) {
   const string& local_backend_id = exec_env_->subscriber()->id();
-  if (known_backends_.find(local_backend_id) != known_backends_.end()) return;
+  bool is_quiescing = shutting_down_.Load() != 0;
+  auto it = known_backends_.find(local_backend_id);
+  // 'is_quiescing' can change during the lifetime of the Impalad - make sure 
that the
+  // membership reflects the current value.
+  if (it != known_backends_.end()
+      && is_quiescing == it->second.is_quiescing) {
+    return;
+  }
 
   TBackendDescriptor local_backend_descriptor;
   local_backend_descriptor.__set_is_coordinator(FLAGS_is_coordinator);
@@ -1763,6 +1789,7 @@ void ImpalaServer::AddLocalBackendToStatestore(
   local_backend_descriptor.ip_address = exec_env_->ip_address();
   local_backend_descriptor.__set_proc_mem_limit(
       exec_env_->process_mem_tracker()->limit());
+  local_backend_descriptor.__set_is_quiescing(is_quiescing);
   const TNetworkAddress& krpc_address = exec_env_->krpc_address();
   DCHECK(IsResolvedAddress(krpc_address));
   local_backend_descriptor.__set_krpc_address(krpc_address);
@@ -1779,6 +1806,8 @@ void ImpalaServer::AddLocalBackendToStatestore(
     LOG(WARNING) << "Failed to serialize Impala backend descriptor for 
statestore topic:"
                  << " " << status.GetDetail();
     subscriber_topic_updates->pop_back();
+  } else if (it != known_backends_.end()) {
+    it->second.is_quiescing = is_quiescing;
   } else {
     known_backends_.insert(make_pair(item.key, local_backend_descriptor));
   }
@@ -2273,6 +2302,8 @@ Status ImpalaServer::Start(int32_t thrift_be_port, 
int32_t beeswax_port,
 }
 
 void ImpalaServer::Join() {
+  // The server shuts down by exiting the process, so just block here until 
the process
+  // exits.
   thrift_be_server_->Join();
   thrift_be_server_.reset();
 
@@ -2282,7 +2313,6 @@ void ImpalaServer::Join() {
     beeswax_server_.reset();
     hs2_server_.reset();
   }
-  shutdown_promise_.Get();
 }
 
 shared_ptr<ClientRequestState> ImpalaServer::GetClientRequestState(
@@ -2311,4 +2341,94 @@ void ImpalaServer::UpdateFilter(TUpdateFilterResult& 
result,
   }
   client_request_state->UpdateFilter(params);
 }
+
+Status ImpalaServer::CheckNotShuttingDown() const {
+  if (!IsShuttingDown()) return Status::OK();
+  return Status::Expected(ErrorMsg(
+      TErrorCode::SERVER_SHUTTING_DOWN, 
ShutdownStatusToString(GetShutdownStatus())));
+}
+
+TShutdownStatus ImpalaServer::GetShutdownStatus() const {
+  TShutdownStatus result;
+  int64_t shutdown_time = shutting_down_.Load();
+  DCHECK_GT(shutdown_time, 0);
+  int64_t shutdown_deadline = shutdown_deadline_.Load();
+  DCHECK_GT(shutdown_time, 0);
+  int64_t now = MonotonicMillis();
+  int64_t elapsed_ms = now - shutdown_time;
+  result.grace_remaining_ms =
+      max<int64_t>(0, FLAGS_shutdown_grace_period_s * 1000 - elapsed_ms);
+  result.deadline_remaining_ms =
+      max<int64_t>(0, shutdown_deadline - now);
+  result.finstances_executing =
+      ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue();
+  result.client_requests_registered = 
ImpaladMetrics::NUM_QUERIES_REGISTERED->GetValue();
+  return result;
+}
+
+string ImpalaServer::ShutdownStatusToString(const TShutdownStatus& 
shutdown_status) {
+  return Substitute("startup grace period left: $0, deadline left: $1, "
+      "fragment instances: $2, queries registered: $3",
+      PrettyPrinter::Print(shutdown_status.grace_remaining_ms, TUnit::TIME_MS),
+      PrettyPrinter::Print(shutdown_status.deadline_remaining_ms, 
TUnit::TIME_MS),
+      shutdown_status.finstances_executing, 
shutdown_status.client_requests_registered);
+}
+
+Status ImpalaServer::StartShutdown(
+    int64_t relative_deadline_s, TShutdownStatus* shutdown_status) {
+  DCHECK_GE(relative_deadline_s, -1);
+  if (relative_deadline_s == -1) relative_deadline_s = 
FLAGS_shutdown_deadline_s;
+  int64_t now = MonotonicMillis();
+  int64_t new_deadline = now + relative_deadline_s * 1000L;
+
+  bool set_deadline = false;
+  bool set_grace = false;
+  int64_t curr_deadline = shutdown_deadline_.Load();
+  while (curr_deadline == 0 || curr_deadline > new_deadline) {
+    // Set the deadline - it was either unset or later than the new one.
+    if (shutdown_deadline_.CompareAndSwap(curr_deadline, new_deadline)) {
+      set_deadline = true;
+      break;
+    }
+    curr_deadline = shutdown_deadline_.Load();
+  }
+
+  while (shutting_down_.Load() == 0) {
+    if (!shutting_down_.CompareAndSwap(0, now)) continue;
+    unique_ptr<Thread> t;
+    Status status =
+        Thread::Create("shutdown", "shutdown", [this] { ShutdownThread(); }, 
&t, false);
+    if (!status.ok()) {
+      LOG(ERROR) << "Failed to create shutdown thread: " << status.GetDetail();
+      return status;
+    }
+    set_grace = true;
+    break;
+  }
+  *shutdown_status = GetShutdownStatus();
+  // Show the full grace/limit times to avoid showing confusing intermediate 
values
+  // to the person running the statement.
+  if (set_grace) {
+    shutdown_status->grace_remaining_ms = FLAGS_shutdown_grace_period_s * 
1000L;
+  }
+  if (set_deadline) shutdown_status->deadline_remaining_ms = 
relative_deadline_s * 1000L;
+  return Status::OK();
+}
+
+[[noreturn]] void ImpalaServer::ShutdownThread() {
+  while (true) {
+    SleepForMs(1000);
+    TShutdownStatus shutdown_status = GetShutdownStatus();
+    LOG(INFO) << "Shutdown status: " << 
ShutdownStatusToString(shutdown_status);
+    if (shutdown_status.grace_remaining_ms <= 0
+        && shutdown_status.finstances_executing == 0
+        && shutdown_status.client_requests_registered == 0) {
+      break;
+    } else if (shutdown_status.deadline_remaining_ms <= 0) {
+      break;
+    }
+  }
+  LOG(INFO) << "Shutdown complete, going down.";
+  exit(0);
+}
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 14c33d1..7db5ce4 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -97,6 +97,40 @@ class QuerySchedule;
 /// Internally, the Membership callback thread also participates in startup:
 ///    - If services_started_, then register to the statestore as an executor.
 ///
+/// Shutdown
+/// --------
+/// Impala Server shutdown can be initiated by a remote shutdown command from 
another
+/// Impala daemon or by a local shutdown command from a user session. The 
shutdown
+/// sequence aims to quiesce the Impalad (i.e. drain it of any running 
finstances or
+/// client requests) then exit the process cleanly. The shutdown sequence is 
as follows:
+///
+/// 1. StartShutdown() is called to initiate the process.
+/// 2. The startup grace period starts, during which:
+///   - no new client requests are accepted. Clients can still interact with 
registered
+///     requests and sessions as normal.
+///   - the Impala daemon is marked in the statestore as quiescing, so 
coordinators
+///     will not schedule new fragments on it (once the statestore update 
propagates).
+///   - the Impala daemon continues to start executing any new fragments sent 
to it by
+///     coordinators. This is required because the query may have been 
submitted before
+///     the coordinator learned that the executor was quiescing. Delays occur 
for several
+///     reasons:
+///     -> Latency of membership propagation through the statestore
+///     -> Latency of query startup work including scheduling, admission 
control and
+///        fragment startup.
+///     -> Queuing delay in the admission controller (which may be unbounded).
+/// 3. The startup grace period elapses.
+/// 4. The background shutdown thread periodically checks to see if the Impala 
daemon is
+///    quiesced (i.e. no client requests are registered and no fragment 
instances are
+///    executing). If it is quiesced then it cleanly shuts down by exiting the 
process.
+/// 5. The shutdown deadline elapses. The Impala daemon exits regardless of 
whether
+///    it was successfully quiesced or not.
+///
+/// If shutdown is initiated again during this process, it does not cancel the 
existing
+/// shutdown but can decrease the deadline, e.g. if an administrator starts 
shutdown
+/// with a deadline of 1 hour, but then wants to shut down the cluster sooner, 
they can
+/// run the shutdown function again to set a shorter deadline. The deadline 
can't be
+/// increased after shutdown is started.
+///
 /// Locking
 /// -------
 /// This class is partially thread-safe. To ensure freedom from deadlock, if 
multiple
@@ -118,10 +152,6 @@ class QuerySchedule;
 /// * catalog_version_lock_
 /// * connection_to_sessions_map_lock_
 ///
-/// TODO: The state of a running query is currently not cleaned up if the
-/// query doesn't experience any errors at runtime and close() doesn't get 
called.
-/// The solution is to have a separate thread that cleans up orphaned
-/// query execution states after a timeout period.
 /// TODO: The same doesn't apply to the execution state of an individual plan
 /// fragment: the originating coordinator might die, but we can get notified of
 /// that via the statestore. This still needs to be implemented.
@@ -145,12 +175,9 @@ class ImpalaServer : public ImpalaServiceIf,
   /// Start() by calling GetThriftBackendPort(), GetBeeswaxPort() or 
GetHS2Port().
   Status Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port);
 
-  /// Blocks until the server shuts down (by calling Shutdown()).
+  /// Blocks until the server shuts down.
   void Join();
 
-  /// Triggers service shutdown, by unblocking Join().
-  void Shutdown() { shutdown_promise_.Set(true); }
-
   /// ImpalaService rpcs: Beeswax API (implemented in impala-beeswax-server.cc)
   virtual void query(beeswax::QueryHandle& query_handle, const beeswax::Query& 
query);
   virtual void executeAndWait(beeswax::QueryHandle& query_handle,
@@ -372,6 +399,28 @@ class ImpalaServer : public ImpalaServiceIf,
   typedef boost::unordered_map<std::string, TBackendDescriptor> 
BackendDescriptorMap;
   const BackendDescriptorMap& GetKnownBackends();
 
+  /// Start the shutdown process. Return an error if it could not be started. 
Otherwise,
+  /// if it was successfully started by this or a previous call, return OK 
along with
+  /// information about the pending shutdown in 'shutdown_status'. 
'relative_deadline_s'
+  /// is the deadline value in seconds to use, or -1 if we should use the 
default
+  /// deadline. See Shutdown class comment for explanation of the shutdown 
sequence.
+  Status StartShutdown(int64_t relative_deadline_s, TShutdownStatus* 
shutdown_status);
+
+  /// Returns true if a shut down is in progress.
+  bool IsShuttingDown() const { return shutting_down_.Load() != 0; }
+
+  /// Returns an informational error about why a new operation could not be 
started
+  /// if the server is shutting down. Must be called before starting execution 
of a
+  /// new operation (e.g. a query).
+  Status CheckNotShuttingDown() const;
+
+  /// Return information about the status of a shutdown. Only valid to call if 
a shutdown
+  /// is in progress (i.e. IsShuttingDown() is true).
+  TShutdownStatus GetShutdownStatus() const;
+
+  /// Convert the shutdown status to a human-readable string.
+  static std::string ShutdownStatusToString(const TShutdownStatus& 
shutdown_status);
+
   // Mapping between query option names and levels
   QueryOptionLevels query_option_levels_;
 
@@ -852,6 +901,9 @@ class ImpalaServer : public ImpalaServiceIf,
       const std::string& authorized_proxy_config_delimiter,
       AuthorizedProxyMap* authorized_proxy_map);
 
+  /// Background thread that does the shutdown.
+  [[noreturn]] void ShutdownThread();
+
   /// Guards query_log_ and query_log_index_
   boost::mutex query_log_lock_;
 
@@ -1143,11 +1195,15 @@ class ImpalaServer : public ImpalaServiceIf,
   /// set after all services required for the server have been started.
   std::atomic_bool services_started_;
 
-  /// Set to true when this ImpalaServer should shut down.
-  Promise<bool> shutdown_promise_;
-};
-
+  /// Whether the Impala server shutdown process started. If 0, shutdown was 
not started.
+  /// Otherwise, this is the MonotonicMillis() value when the shut down was 
started.
+  AtomicInt64 shutting_down_{0};
 
+  /// The MonotonicMillis() value after we should shut down regardless of 
registered
+  /// client requests and running finstances. Set before 'shutting_down_' and 
updated
+  /// atomically if a new shutdown command with a shorter deadline comes in.
+  AtomicInt64 shutdown_deadline_{0};
+};
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/be/src/testutil/fault-injection-util.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/fault-injection-util.h 
b/be/src/testutil/fault-injection-util.h
index f17327f..f545e1f 100644
--- a/be/src/testutil/fault-injection-util.h
+++ b/be/src/testutil/fault-injection-util.h
@@ -34,6 +34,7 @@ class FaultInjectionUtil {
     RPC_UPDATEFILTER,
     RPC_TRANSMITDATA,
     RPC_REPORTEXECSTATUS,
+    RPC_REMOTESHUTDOWN,
     RPC_RANDOM    // This must be last.
   };
 

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/be/src/util/default-path-handlers.cc
----------------------------------------------------------------------
diff --git a/be/src/util/default-path-handlers.cc 
b/be/src/util/default-path-handlers.cc
index 0b64702..c492d06 100644
--- a/be/src/util/default-path-handlers.cc
+++ b/be/src/util/default-path-handlers.cc
@@ -251,11 +251,20 @@ void RootHandler(const Webserver::ArgumentMap& args, 
Document* document) {
 
   ExecEnv* env = ExecEnv::GetInstance();
   if (env == nullptr || env->impala_server() == nullptr) return;
+  ImpalaServer* impala_server = env->impala_server();
   document->AddMember("impala_server_mode", true, document->GetAllocator());
-  document->AddMember("is_coordinator", env->impala_server()->IsCoordinator(),
+  document->AddMember("is_coordinator", impala_server->IsCoordinator(),
       document->GetAllocator());
-  document->AddMember("is_executor", env->impala_server()->IsExecutor(),
+  document->AddMember("is_executor", impala_server->IsExecutor(),
       document->GetAllocator());
+  bool is_quiescing = impala_server->IsShuttingDown();
+  document->AddMember("is_quiescing", is_quiescing, document->GetAllocator());
+  if (is_quiescing) {
+    Value shutdown_status(
+        
impala_server->ShutdownStatusToString(impala_server->GetShutdownStatus()).c_str(),
+        document->GetAllocator());
+    document->AddMember("shutdown_status", shutdown_status, 
document->GetAllocator());
+  }
 }
 
 void AddDefaultUrlCallbacks(Webserver* webserver, MemTracker* 
process_mem_tracker,

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 9419b17..249636b 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -513,6 +513,31 @@ struct TSetQueryOptionRequest {
   3: optional bool is_set_all
 }
 
+struct TShutdownParams {
+  // Set if a backend was specified as an argument to the shutdown function. 
If not set,
+  // the current impala daemon will be shut down. If the port was specified, 
it is set
+  // in 'backend'. If it was not specified, it is 0 and the port configured 
for this
+  // Impala daemon is assumed.
+  1: optional Types.TNetworkAddress backend
+
+  // Deadline in seconds for shutting down.
+  2: optional i64 deadline_s
+}
+
+// The type of administrative function to be executed.
+enum TAdminRequestType {
+  SHUTDOWN
+}
+
+// Parameters for administrative function statement. This is essentially a 
tagged union
+// that contains parameters for the type of administrative statement to be 
executed.
+struct TAdminRequest {
+  1: required TAdminRequestType type
+
+  // The below member corresponding to 'type' should be set.
+  2: optional TShutdownParams shutdown_params
+}
+
 // HiveServer2 Metadata operations (JniFrontend.hiveServer2MetadataOperation)
 enum TMetadataOpcode {
   GET_TYPE_INFO,
@@ -605,6 +630,9 @@ struct TExecRequest {
 
   // Profile information from the planning process.
   13: optional RuntimeProfile.TRuntimeProfileNode profile
+
+  // Set iff stmt_type is ADMIN_FN.
+  14: optional TAdminRequest admin_request
 }
 
 // Parameters to FeSupport.cacheJar().

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index 5367a6e..cf8233d 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -851,6 +851,38 @@ struct TPublishFilterParams {
 struct TPublishFilterResult {
 }
 
+// RemoteShutdown
+
+struct TRemoteShutdownParams {
+  // Deadline for the shutdown. After this deadline expires (starting at the 
time when
+  // this remote shutdown command is received), the Impala daemon exits 
immediately
+  // regardless of whether queries are still executing.
+  1: optional i64 deadline_s
+}
+
+// The current status of a shutdown operation.
+struct TShutdownStatus {
+  // Milliseconds remaining in startup grace period. 0 if the period has 
expired.
+  1: required i64 grace_remaining_ms
+
+  // Milliseconds remaining in shutdown deadline. 0 if the deadline has 
expired.
+  2: required i64 deadline_remaining_ms
+
+  // Number of fragment instances still executing.
+  3: required i64 finstances_executing
+
+  // Number of client requests still registered with the Impala server that is 
being shut
+  // down.
+  4: required i64 client_requests_registered
+}
+
+struct TRemoteShutdownResult {
+  // Success or failure of the operation.
+  1: required Status.TStatus status
+
+  // If status is OK, additional info about the shutdown status
+  2: required TShutdownStatus shutdown_status
+}
 
 service ImpalaInternalService {
   // Called by coord to start asynchronous execution of a query's fragment 
instances in
@@ -875,4 +907,7 @@ service ImpalaInternalService {
   // Called by the coordinator to deliver global runtime filters to fragments 
for
   // application at plan nodes.
   TPublishFilterResult PublishFilter(1:TPublishFilterParams params);
+
+  // Called to initiate shutdown of this backend.
+  TRemoteShutdownResult RemoteShutdown(1:TRemoteShutdownParams params);
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/common/thrift/StatestoreService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/StatestoreService.thrift 
b/common/thrift/StatestoreService.thrift
index 1c82170..bce0f65 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -74,6 +74,10 @@ struct TBackendDescriptor {
 
   // The process memory limit of this backend (in bytes).
   8: required i64 proc_mem_limit;
+
+  // True if fragment instances should not be scheduled on this daemon because 
the
+  // daemon has been quiescing, e.g. if it shutting down.
+  9: required bool is_quiescing;
 }
 
 // Description of a single entry in a topic

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/common/thrift/Types.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift
index d1dd5a3..6236f10 100644
--- a/common/thrift/Types.thrift
+++ b/common/thrift/Types.thrift
@@ -100,7 +100,8 @@ enum TStmtType {
   DML, // Data modification e.g. INSERT
   EXPLAIN,
   LOAD, // Statement type for LOAD commands
-  SET
+  SET,
+  ADMIN_FN // Admin function, e.g. ": shutdown()".
 }
 
 // Level of verboseness for "explain" output.

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py 
b/common/thrift/generate_error_codes.py
index c1f8b7e..84859b1 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -364,6 +364,8 @@ error_codes = (
    "on backend $2 at offset $3: verification of read data failed."),
 
   ("CANCELLED_INTERNALLY", 119, "Cancelled in $0"),
+
+  ("SERVER_SHUTTING_DOWN", 120, "Server is being shut down: $0."),
 )
 
 import sys

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/fe/src/main/cup/sql-parser.cup
----------------------------------------------------------------------
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index e334591..4b6006a 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -537,6 +537,9 @@ nonterminal ShowFunctionsStmt show_functions_stmt;
 nonterminal DropFunctionStmt drop_function_stmt;
 nonterminal TFunctionCategory opt_function_category;
 
+// Admin statements.
+nonterminal AdminFnStmt admin_fn_stmt;
+
 precedence left KW_OR;
 precedence left KW_AND;
 precedence right KW_NOT, NOT;
@@ -671,6 +674,8 @@ stmt ::=
   {: RESULT = revoke_privilege; :}
   | comment_on_stmt:comment_on
   {: RESULT = comment_on; :}
+  | admin_fn_stmt:shutdown
+  {: RESULT = shutdown; :}
   | stmt:s SEMICOLON
   {: RESULT = s; :}
   ;
@@ -2513,6 +2518,14 @@ set_stmt ::=
   {: RESULT = new SetStmt(null, null, false); :}
   ;
 
+// Top-level function call, e.g. ": shutdown()", used for admin commands, etc.
+admin_fn_stmt ::=
+    COLON ident_or_default:fn_name LPAREN RPAREN
+  {: RESULT = new AdminFnStmt(fn_name, Collections.<Expr>emptyList()); :}
+  | COLON ident_or_default:fn_name LPAREN expr_list:params RPAREN
+  {: RESULT = new AdminFnStmt(fn_name, params); :}
+  ;
+
 select_list ::=
   select_list_item:item
   {:

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java
new file mode 100644
index 0000000..2f2eb2e
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import java.util.List;
+
+import org.apache.impala.authorization.PrivilegeRequestBuilder;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.InternalException;
+import org.apache.impala.common.Pair;
+import org.apache.impala.thrift.TAdminRequest;
+import org.apache.impala.thrift.TAdminRequestType;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TShutdownParams;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Represents an administrative function call, e.g. ": 
shutdown('hostname:123')".
+ *
+ * This "admin statement" framework provides a way to expand the set of 
supported admin
+ * statements without modifying the SQL grammar. For now, the only supported 
function is
+ * shutdown(), so the logic in here is not generic.
+ */
+public class AdminFnStmt extends StatementBase {
+  // Name of the function. Validated during analysis.
+  private final String fnName_;
+
+  // Arguments to the function. Always non-null.
+  private final List<Expr> params_;
+
+  // Parameters for the shutdown() command.
+  // Address of the backend to shut down, If 'backend_' is null, that means 
the current
+  // server. If 'backend_.port' is 0, we assume the backend has the same port 
as this
+  // impalad.
+  private TNetworkAddress backend_;
+  // Deadline in seconds. -1 if no deadline specified.
+  private long deadlineSecs_;
+
+  public AdminFnStmt(String fnName, List<Expr> params) {
+    this.fnName_ = fnName;
+    this.params_ = params;
+  }
+
+  @Override
+  public String toSql() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(":").append(fnName_).append("(");
+    List<String> paramsSql = Lists.newArrayList();
+    for (Expr param: params_) paramsSql.add(param.toSql());
+    sb.append(Joiner.on(", ").join(paramsSql));
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public TAdminRequest toThrift() throws InternalException {
+    TAdminRequest result = new TAdminRequest();
+    result.type = TAdminRequestType.SHUTDOWN;
+    result.shutdown_params = new TShutdownParams();
+    if (backend_ != null) result.shutdown_params.setBackend(backend_);
+    if (deadlineSecs_ != -1) 
result.shutdown_params.setDeadline_s(deadlineSecs_);
+    return result;
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    super.analyze(analyzer);
+    for (Expr param : params_) param.analyze(analyzer);
+    // Only shutdown is supported.
+    if (fnName_.toLowerCase().equals("shutdown")) {
+      analyzeShutdown(analyzer);
+    } else {
+      throw new AnalysisException("Unknown admin function: " + fnName_);
+    }
+  }
+
+  /**
+   * Supports optionally specifying the backend and the deadline: either 
shutdown(),
+   * shutdown('host:port'), shutdown(deadline), shutdown('host:port', 
deadline).
+   */
+  private void analyzeShutdown(Analyzer analyzer) throws AnalysisException {
+    if (analyzer.getAuthzConfig().isEnabled()) {
+      // Only admins (i.e. user with ALL privilege on server) can execute 
admin functions.
+      String authzServer = analyzer.getAuthzConfig().getServerName();
+      Preconditions.checkNotNull(authzServer);
+      analyzer.registerPrivReq(
+          new 
PrivilegeRequestBuilder().onServer(authzServer).all().toRequest());
+    }
+
+    // TODO: this parsing and type checking logic is specific to the command, 
similar to
+    // handling of other top-level commands. If we add a lot more of these 
functions we
+    // could consider making it generic, similar to handling of normal 
function calls.
+    Pair<Expr, Expr> args = getShutdownArgs();
+    Expr backendExpr = args.first;
+    Expr deadlineExpr = args.second;
+    backend_ = null;
+    deadlineSecs_ = -1;
+    if (backendExpr != null) {
+      if (!(backendExpr instanceof StringLiteral)) {
+        throw new AnalysisException(
+            "Invalid backend, must be a string literal: " + 
backendExpr.toSql());
+      }
+      backend_ = parseBackendAddress(((StringLiteral) backendExpr).getValue());
+    }
+    if (deadlineExpr != null) {
+      deadlineSecs_ = deadlineExpr.evalToNonNegativeInteger(analyzer, 
"deadline");
+    }
+  }
+
+  // Return a pair of the backend and deadline arguments, null if not present.
+  private Pair<Expr, Expr> getShutdownArgs() throws AnalysisException {
+    if (params_.size() == 0) {
+      return Pair.create(null, null);
+    } else if (params_.size() == 1) {
+      if (params_.get(0).getType().isStringType()) {
+        return Pair.create(params_.get(0), null);
+      } else {
+        return Pair.create(null, params_.get(0));
+      }
+    } else if (params_.size() == 2) {
+      return Pair.create(params_.get(0), params_.get(1));
+    } else {
+      throw new AnalysisException("Shutdown takes 0, 1 or 2 arguments: " + 
toSql());
+    }
+  }
+
+  // Parse the backend and optional port from 'backend'. Port is set to 0 if 
not set in
+  // the string.
+  private TNetworkAddress parseBackendAddress(String backend) throws 
AnalysisException {
+    TNetworkAddress result = new TNetworkAddress();
+    // Extract host and port from backend string.
+    String[] toks = backend.trim().split(":");
+    if (toks.length == 0 || toks.length > 2) {
+      throw new AnalysisException("Invalid backend address: " + backend);
+    }
+    result.hostname = toks[0];
+    result.port = 0;
+    if (toks.length == 2) {
+      try {
+        result.port = Integer.parseInt(toks[1]);
+      } catch (NumberFormatException nfe) {
+        throw new AnalysisException(
+            "Invalid port number in backend address: " + backend);
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java 
b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index 2f51c71..a10f5f0 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -132,6 +132,7 @@ public class AnalysisContext {
       return stmt_ instanceof ShowCreateFunctionStmt;
     }
     public boolean isShowFilesStmt() { return stmt_ instanceof ShowFilesStmt; }
+    public boolean isAdminFnStmt() { return stmt_ instanceof AdminFnStmt; }
     public boolean isDescribeDbStmt() { return stmt_ instanceof 
DescribeDbStmt; }
     public boolean isDescribeTableStmt() { return stmt_ instanceof 
DescribeTableStmt; }
     public boolean isResetMetadataStmt() { return stmt_ instanceof 
ResetMetadataStmt; }
@@ -366,6 +367,11 @@ public class AnalysisContext {
       return (AlterDbStmt) stmt_;
     }
 
+    public AdminFnStmt getAdminFnStmt() {
+      Preconditions.checkState(isAdminFnStmt());
+      return (AdminFnStmt) stmt_;
+    }
+
     public StatementBase getStmt() { return stmt_; }
     public Analyzer getAnalyzer() { return analyzer_; }
     public Set<TAccessEvent> getAccessEvents() { return 
analyzer_.getAccessEvents(); }

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/fe/src/main/java/org/apache/impala/analysis/Expr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java 
b/fe/src/main/java/org/apache/impala/analysis/Expr.java
index 77797bc..55453b5 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Expr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java
@@ -33,8 +33,11 @@ import org.apache.impala.catalog.PrimitiveType;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.InternalException;
 import org.apache.impala.common.TreeNode;
 import org.apache.impala.rewrite.ExprRewriter;
+import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TColumnValue;
 import org.apache.impala.thrift.TExpr;
 import org.apache.impala.thrift.TExprNode;
 import org.apache.impala.thrift.TFunction;
@@ -1504,4 +1507,63 @@ abstract public class Expr extends TreeNode<Expr> 
implements ParseNode, Cloneabl
     };
     return Joiner.on(",").join(Iterables.transform(exprs, toSql));
   }
+
+  /**
+   * Analyzes and evaluates expression to an integral value, returned as a 
long.
+   * Throws if the expression cannot be evaluated or if the value evaluates to 
null.
+   * The 'name' parameter is used in exception messages, e.g. "LIMIT expression
+   * evaluates to NULL".
+   */
+  public long evalToInteger(Analyzer analyzer, String name) throws 
AnalysisException {
+    // Check for slotrefs and subqueries before analysis so we can provide a 
more
+    // helpful error message.
+    if (contains(SlotRef.class) || contains(Subquery.class)) {
+      throw new AnalysisException(name + " expression must be a constant 
expression: " +
+          toSql());
+    }
+    analyze(analyzer);
+    if (!isConstant()) {
+      throw new AnalysisException(name + " expression must be a constant 
expression: " +
+          toSql());
+    }
+    if (!getType().isIntegerType()) {
+      throw new AnalysisException(name + " expression must be an integer type 
but is '" +
+          getType() + "': " + toSql());
+    }
+    TColumnValue val = null;
+    try {
+      val = FeSupport.EvalExprWithoutRow(this, analyzer.getQueryCtx());
+    } catch (InternalException e) {
+      throw new AnalysisException("Failed to evaluate expr: " + toSql(), e);
+    }
+    long value;
+    if (val.isSetLong_val()) {
+      value = val.getLong_val();
+    } else if (val.isSetInt_val()) {
+      value = val.getInt_val();
+    } else if (val.isSetShort_val()) {
+      value = val.getShort_val();
+    } else if (val.isSetByte_val()) {
+      value = val.getByte_val();
+    } else {
+      throw new AnalysisException(name + " expression evaluates to NULL: " + 
toSql());
+    }
+    return value;
+  }
+
+  /**
+   * Analyzes and evaluates expression to a non-negative integral value, 
returned as a
+   * long. Throws if the expression cannot be evaluated, if the value 
evaluates to null,
+   * or if the result is negative. The 'name' parameter is used in exception 
messages,
+   * e.g. "LIMIT expression evaluates to NULL".
+   */
+  public long evalToNonNegativeInteger(Analyzer analyzer, String name)
+      throws AnalysisException {
+    long value = evalToInteger(analyzer, name);
+    if (value < 0) {
+      throw new AnalysisException(name + " must be a non-negative integer: " +
+          toSql() + " = " + value);
+    }
+    return value;
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/fe/src/main/java/org/apache/impala/analysis/LimitElement.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/LimitElement.java 
b/fe/src/main/java/org/apache/impala/analysis/LimitElement.java
index ab5dac1..73758a2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LimitElement.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LimitElement.java
@@ -18,9 +18,6 @@
 package org.apache.impala.analysis;
 
 import org.apache.impala.common.AnalysisException;
-import org.apache.impala.common.InternalException;
-import org.apache.impala.service.FeSupport;
-import org.apache.impala.thrift.TColumnValue;
 
 import com.google.common.base.Preconditions;
 
@@ -103,63 +100,14 @@ class LimitElement {
   public void analyze(Analyzer analyzer) throws AnalysisException {
     isAnalyzed_ = true;
     if (limitExpr_ != null) {
-      limit_ = evalIntegerExpr(analyzer, limitExpr_, "LIMIT");
+      limit_ = limitExpr_.evalToNonNegativeInteger(analyzer, "LIMIT");
     }
     if (limit_ == 0) analyzer.setHasEmptyResultSet();
     if (offsetExpr_ != null) {
-      offset_ = evalIntegerExpr(analyzer, offsetExpr_, "OFFSET");
+      offset_ = offsetExpr_.evalToNonNegativeInteger(analyzer, "OFFSET");
     }
   }
 
-  /**
-   * Analyzes and evaluates expression to a non-zero integral value, returned 
as a long.
-   * Throws if the expression cannot be evaluated, if the value evaluates to 
null, or if
-   * the result is negative. The 'name' parameter is used in exception 
messages, e.g.
-   * "LIMIT expression evaluates to NULL".
-   */
-  private static long evalIntegerExpr(Analyzer analyzer, Expr expr, String 
name)
-      throws AnalysisException {
-    // Check for slotrefs and subqueries before analysis so we can provide a 
more
-    // helpful error message.
-    if (expr.contains(SlotRef.class) || expr.contains(Subquery.class)) {
-      throw new AnalysisException(name + " expression must be a constant 
expression: " +
-          expr.toSql());
-    }
-    expr.analyze(analyzer);
-    if (!expr.isConstant()) {
-      throw new AnalysisException(name + " expression must be a constant 
expression: " +
-          expr.toSql());
-    }
-    if (!expr.getType().isIntegerType()) {
-      throw new AnalysisException(name + " expression must be an integer type 
but is '" +
-          expr.getType() + "': " + expr.toSql());
-    }
-    TColumnValue val = null;
-    try {
-      val = FeSupport.EvalExprWithoutRow(expr, analyzer.getQueryCtx());
-    } catch (InternalException e) {
-      throw new AnalysisException("Failed to evaluate expr: " + expr.toSql(), 
e);
-    }
-    long value;
-    if (val.isSetLong_val()) {
-      value = val.getLong_val();
-    } else if (val.isSetInt_val()) {
-      value = val.getInt_val();
-    } else if (val.isSetShort_val()) {
-      value = val.getShort_val();
-    } else if (val.isSetByte_val()) {
-      value = val.getByte_val();
-    } else {
-      throw new AnalysisException(name + " expression evaluates to NULL: " +
-          expr.toSql());
-    }
-    if (value < 0) {
-      throw new AnalysisException(name + " must be a non-negative integer: " +
-          expr.toSql() + " = " + value);
-    }
-    return value;
-  }
-
   @Override
   public LimitElement clone() { return new LimitElement(this); }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index b7a04ef..2c527cc 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -99,6 +99,7 @@ import org.apache.impala.planner.HdfsScanNode;
 import org.apache.impala.planner.PlanFragment;
 import org.apache.impala.planner.Planner;
 import org.apache.impala.planner.ScanNode;
+import org.apache.impala.thrift.TAdminRequest;
 import org.apache.impala.thrift.TAlterDbParams;
 import org.apache.impala.thrift.TCatalogOpRequest;
 import org.apache.impala.thrift.TCatalogOpType;
@@ -134,6 +135,7 @@ import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TResultSetMetadata;
 import org.apache.impala.thrift.TShowFilesParams;
 import org.apache.impala.thrift.TShowStatsOp;
+import org.apache.impala.thrift.TShutdownParams;
 import org.apache.impala.thrift.TStmtType;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
@@ -1130,6 +1132,12 @@ public class Frontend {
           new TColumn("level", Type.STRING.toThrift()))));
       
result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
       return result;
+    } else if (analysisResult.isAdminFnStmt()) {
+      result.stmt_type = TStmtType.ADMIN_FN;
+      result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
+          new TColumn("summary", Type.STRING.toThrift()))));
+      result.setAdmin_request(analysisResult.getAdminFnStmt().toThrift());
+      return result;
     }
     // If unset, set MT_DOP to 0 to simplify the rest of the code.
     if (!queryOptions.isSetMt_dop()) queryOptions.setMt_dop(0);

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 520b88f..fe7e352 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -3861,4 +3861,54 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     fnName.analyze(dummyAnalyzer, false);
     assertFalse(fnName.isBuiltin());
   }
+
+  @Test
+  public void TestAdminFns() throws ImpalaException {
+    AnalyzesOk(": shutdown()");
+    AnalyzesOk(":sHuTdoWn()");
+    AnalyzesOk(":   SHUTDOWN()");
+    AnalyzesOk(": sHuTdoWn('hostname')");
+    AnalyzesOk(": sHuTdoWn(\"hostname\")");
+    AnalyzesOk(": sHuTdoWn(\"hostname:1234\")");
+    AnalyzesOk(": shutdown(10)");
+    AnalyzesOk(": shutdown('hostname', 10)");
+    AnalyzesOk(": shutdown('hostname:11', 10)");
+    AnalyzesOk(": shutdown('hostname:11', 10 * 60)");
+    AnalyzesOk(": shutdown(10 * 60)");
+    AnalyzesOk(": shutdown(0)");
+
+    // Unknown admin functions.
+    AnalysisError(": foobar()", "Unknown admin function: foobar");
+    AnalysisError(": 1a()", "Unknown admin function: 1a");
+    AnalysisError(": foobar(1,2,3)", "Unknown admin function: foobar");
+
+    // Invalid number of shutdown params.
+    AnalysisError(": shutdown('a', 'b', 'c', 'd')",
+        "Shutdown takes 0, 1 or 2 arguments: :shutdown('a', 'b', 'c', 'd')");
+    AnalysisError(": shutdown(1, 2, 3)",
+        "Shutdown takes 0, 1 or 2 arguments: :shutdown(1, 2, 3)");
+
+    // Invalid type of shutdown params.
+    AnalysisError(": shutdown(a)",
+        "Could not resolve column/field reference: 'a'");
+    AnalysisError(": shutdown(1, 2)",
+        "Invalid backend, must be a string literal: 1");
+    AnalysisError(": shutdown(concat('host:', '1234'), 2)",
+        "Invalid backend, must be a string literal: concat('host:', '1234')");
+    AnalysisError(": shutdown('backend:1234', '...')",
+        "deadline expression must be an integer type but is 'STRING': '...'");
+    AnalysisError(": shutdown(true)",
+        "deadline expression must be an integer type but is 'BOOLEAN': TRUE");
+
+    // Invalid host/port.
+    AnalysisError(": shutdown('foo:bar')",
+        "Invalid port number in backend address: foo:bar");
+    AnalysisError(": shutdown('foo:bar:1234')",
+        "Invalid backend address: foo:bar:1234");
+
+    // Invalid deadline value.
+    AnalysisError(": shutdown(-1)", "deadline must be a non-negative integer: 
-1 = -1");
+    AnalysisError(": shutdown(1.234)",
+        "deadline expression must be an integer type but is 'DECIMAL(4,3)': 
1.234");
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
----------------------------------------------------------------------
diff --git 
a/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
index c30a97a..09d9d57 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
@@ -2623,6 +2623,19 @@ public class AuthorizationStmtTest extends 
FrontendTestBase {
     }
   }
 
+  @Test
+  public void testShutdown() throws ImpalaException {
+    // Requires ALL privilege on server.
+    authorize(": shutdown()")
+        .ok(onServer(TPrivilegeLevel.ALL))
+        .error(accessError("server"))
+        .error(accessError("server"), onServer(TPrivilegeLevel.REFRESH))
+        .error(accessError("server"), onServer(TPrivilegeLevel.SELECT))
+        .error(accessError("server"), onDatabase("functional", 
TPrivilegeLevel.ALL))
+        .error(accessError("server"),
+            onTable("functional", "alltypes", TPrivilegeLevel.ALL));
+  }
+
   // Convert TDescribeResult to list of strings.
   private static List<String> resultToStringList(TDescribeResult result) {
     List<String> list = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java 
b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index be97d67..5ad2876 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -3838,4 +3838,39 @@ public class ParserTest extends FrontendTestBase {
       ParserError(String.format("ALTER %s SET OWNER", type));
     }
   }
+
+  public void TestAdminFns() {
+    // Any combination of whitespace is ok.
+    ParsesOk(":foobar()");
+    ParsesOk(": foobar()");
+    ParsesOk(":\tfoobar()");
+    ParsesOk("   :\tfoobar()");
+    ParsesOk("\n:foobar()");
+    ParsesOk("\n:foobar(123)");
+    ParsesOk("\n:foobar(123, 456)");
+    ParsesOk("\n:foobar('foo', 'bar')");
+    ParsesOk("\n:foobar('foo', 'bar', 1, -1, 1234, 99, false)");
+
+    // Any identifier is supported.
+    ParsesOk(": 1a()");
+
+    // Must be prefixed with colon.
+    ParserError("foobar()");
+    ParserError("  foobar()");
+
+    // Non-identifiers not supported.
+    ParserError(": 1()");
+    ParserError(": 'string'()");
+    ParserError(": a.b()");
+
+    // Must be single function with parens. Cannot have multiple statements.
+    ParserError(": shutdown");
+    ParserError(": shutdown foo");
+    ParserError(": shutdown() other()");
+    ParserError(": shutdown(); other()");
+    ParserError(": shutdown(), other()");
+    ParserError(": shutdown() :other()");
+    ParserError(": shutdown() :other()");
+    ParserError(": shutdown('hostA'); :shutdown('hostB');");
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java 
b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index 7209364..dcdd814 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.fail;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FrontendTestBase;
 import org.apache.impala.testutil.TestUtils;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.base.Preconditions;
@@ -66,6 +65,14 @@ public class ToSqlTest extends FrontendTestBase {
     }
   }
 
+  /**
+   * Helper for the common case when the string should be identical after a 
roundtrip
+   * through the parser.
+   */
+  private void testToSql(String query) {
+    testToSql(query, query);
+  }
+
   private void testToSql(String query, String expected) {
     testToSql(query, System.getProperty("user.name"), expected);
   }
@@ -1428,9 +1435,8 @@ public class ToSqlTest extends FrontendTestBase {
    */
   @Test
   public void testInvalidate() {
-    testToSql("INVALIDATE METADATA", "INVALIDATE METADATA");
-    testToSql("INVALIDATE METADATA functional.alltypes",
-        "INVALIDATE METADATA functional.alltypes");
+    testToSql("INVALIDATE METADATA");
+    testToSql("INVALIDATE METADATA functional.alltypes");
   }
 
   /**
@@ -1438,9 +1444,19 @@ public class ToSqlTest extends FrontendTestBase {
    */
   @Test
   public void testRefresh() {
-    testToSql("REFRESH functional.alltypes", "REFRESH functional.alltypes");
-    testToSql("REFRESH functional.alltypes PARTITION (year=2009, month=1)",
-        "REFRESH functional.alltypes PARTITION (year=2009, month=1)");
-    testToSql("REFRESH FUNCTIONS functional", "REFRESH FUNCTIONS functional");
+    testToSql("REFRESH functional.alltypes");
+    testToSql("REFRESH functional.alltypes PARTITION (year=2009, month=1)");
+    testToSql("REFRESH FUNCTIONS functional");
+  }
+
+  /**
+   * Test admin functions are output correctly.
+   */
+  @Test
+  public void testAdminFn() {
+    testToSql(":shutdown()");
+    testToSql(":shutdown('hostname')");
+    testToSql(":shutdown('hostname', 1000)");
+    testToSql(":shutdown(1000)");
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/tests/common/impala_cluster.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index f04b4b9..f25c8ed 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -161,6 +161,9 @@ class Process(object):
   def get_pid(self):
     """Gets the PID of the process. Returns None if the PID cannot be 
determined"""
     LOG.info("Attempting to find PID for %s" % ' '.join(self.cmd))
+    return self.__get_pid()
+
+  def __get_pid(self):
     for pid in psutil.get_pid_list():
       try:
         process = psutil.Process(pid)
@@ -196,10 +199,16 @@ class Process(object):
   def restart(self):
     """Kills and restarts the process"""
     self.kill()
-    # Wait for a bit so the ports will be released.
-    sleep(1)
+    self.wait_for_exit()
     self.start()
 
+  def wait_for_exit(self):
+    """Wait until the process exits (or return immediately if it already has 
exited."""
+    LOG.info('Waiting for exit: {0} (PID: {1})'.format(
+        ' '.join(self.cmd), self.get_pid()))
+    while self.__get_pid() is not None:
+      sleep(0.01)
+
   def __str__(self):
     return "Command: %s PID: %s" % (self.cmd, self.get_pid())
 

http://git-wip-us.apache.org/repos/asf/impala/blob/f46de211/tests/common/impala_service.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index f23c448..0ad4496 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -172,12 +172,16 @@ class ImpaladService(BaseImpalaService):
     self.be_port = be_port
     self.hs2_port = hs2_port
 
-  def get_num_known_live_backends(self, timeout=30, interval=1):
+  def get_num_known_live_backends(self, timeout=30, interval=1,
+      include_shutting_down=True):
     LOG.info("Getting num_known_live_backends from %s:%s" %
         (self.hostname, self.webserver_port))
     result = json.loads(self.read_debug_webpage('backends?json', timeout, 
interval))
-    num = len(result['backends'])
-    return None if num is None else int(num)
+    count = 0
+    for backend in result['backends']:
+      if include_shutting_down or not backend['is_quiescing']:
+        count += 1
+    return count
 
   def get_query_locations(self):
     # Returns a dictionary of the format <host_address, 
num_of_queries_running_there>
@@ -207,12 +211,14 @@ class ImpaladService(BaseImpalaService):
         (num_in_flight_queries, expected_val))
     return False
 
-  def wait_for_num_known_live_backends(self, expected_value, timeout=30, 
interval=1):
+  def wait_for_num_known_live_backends(self, expected_value, timeout=30, 
interval=1,
+      include_shutting_down=True):
     start_time = time()
     while (time() - start_time < timeout):
       value = None
       try:
-        value = self.get_num_known_live_backends(timeout=timeout, 
interval=interval)
+        value = self.get_num_known_live_backends(timeout=timeout, 
interval=interval,
+            include_shutting_down=include_shutting_down)
       except Exception, e:
         LOG.error(e)
       if value == expected_value:
@@ -250,7 +256,9 @@ class ImpaladService(BaseImpalaService):
       if query_state == target_state:
         return
       sleep(interval)
-    assert target_state == query_state, 'Did not reach query state in time'
+    assert target_state == query_state, \
+        'Did not reach query state in time target={0} actual={1}'.format(
+            target_state, query_state)
     return
 
   def wait_for_query_status(self, client, query_id, expected_content,

Reply via email to