This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new d7ee509e9 IMPALA-12648: Add KILL QUERY statement
d7ee509e9 is described below

commit d7ee509e937de0314e423326f8aeeffe0a49b6fb
Author: Xuebin Su <[email protected]>
AuthorDate: Tue Oct 15 11:04:45 2024 +0800

    IMPALA-12648: Add KILL QUERY statement
    
    To support killing queries programatically, this patch adds a new
    type of SQL statements, called the KILL QUERY statement, to cancel and
    unregister a query on any coordinator in the cluster.
    
    A KILL QUERY statement looks like
    ```
    KILL QUERY '123:456';
    ```
    where `123:456` is the query id of the query we want to kill. We follow
    syntax from HIVE-17483. For backward compatibility, 'KILL' and 'QUERY'
    are added as "unreserved keywords", like 'DEFAULT'. This allows the
    three keywords to be used as identifiers.
    
    A user is authorized to kill a query only if the user is an admin or is
    the owner of the query. KILL QUERY statements are not affected by
    admission control.
    
    Implementation:
    
    Since we don't know in advance which impalad is the coordinator of the
    query we want to kill, we need to broadcast the kill request to all the
    coordinators in the cluster. Upon receiving a kill request, each
    coordinator checks whether it is the coordinator of the query:
    - If yes, it cancels and unregisters the query,
    - If no, it reports "Invalid or unknown query handle".
    
    Currently, a KILL QUERY statement is not interruptible. IMPALA-13663 is
    created for this.
    
    For authorization, this patch adds a custom handler of
    AuthorizationException for each statement to allow the exception to be
    handled by the backend. This is because we don't know whether the user
    is the owner of the query until we reach its coordinator.
    
    To support cancelling child queries, this patch changes
    ChildQuery::Cancel() to bypass the HS2 layer so that the session of the
    child query will not be added to the connection used to execute the
    KILL QUERY statement.
    
    Testing:
    - A new ParserTest case is added to test using "unreserved keywords" as
      identifiers.
    - New E2E test cases are added for the KILL QUERY statement.
    - Added a new dimension in TestCancellation to use the KILL QUERY
      statement.
    - Added file tests/common/cluster_config.py and made
      CustomClusterTestSuite.with_args() composable so that common cluster
      configs can be reused in custom cluster tests.
    
    Change-Id: If12d6e47b256b034ec444f17c7890aa3b40481c0
    Reviewed-on: http://gerrit.cloudera.org:8080/21930
    Reviewed-by: Riza Suminto <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
---
 be/src/service/child-query.cc                      |  32 +--
 be/src/service/client-request-state.cc             | 115 ++++++++++
 be/src/service/client-request-state.h              |  12 ++
 be/src/service/control-service.cc                  |  21 ++
 be/src/service/control-service.h                   |   9 +
 be/src/service/impala-server.cc                    |  25 +++
 be/src/service/impala-server.h                     |   6 +
 common/protobuf/control_service.proto              |  18 ++
 common/thrift/Frontend.thrift                      |  14 ++
 common/thrift/Types.thrift                         |   1 +
 fe/src/main/cup/sql-parser.cup                     | 233 ++++++++++++---------
 .../apache/impala/analysis/AnalysisContext.java    |   9 +
 .../org/apache/impala/analysis/KillQueryStmt.java  |  80 +++++++
 .../org/apache/impala/analysis/StatementBase.java  |  24 +++
 .../authorization/BaseAuthorizationChecker.java    |   9 +-
 .../java/org/apache/impala/service/Frontend.java   |   6 +
 fe/src/main/jflex/sql-scanner.flex                 |   2 +
 .../org/apache/impala/analysis/ParserTest.java     |  33 ++-
 tests/common/cluster_config.py                     |  65 ++++++
 tests/common/custom_cluster_test_suite.py          |  27 ++-
 tests/common/impala_connection.py                  |  29 +++
 tests/custom_cluster/test_admission_controller.py  |  21 +-
 tests/custom_cluster/test_kill_query.py            | 142 +++++++++++++
 tests/query_test/test_cancellation.py              |  15 +-
 tests/query_test/test_kill_query.py                |  83 ++++++++
 tests/util/cancel_util.py                          | 101 ++++++++-
 26 files changed, 973 insertions(+), 159 deletions(-)

diff --git a/be/src/service/child-query.cc b/be/src/service/child-query.cc
index 78c94a01e..d2d783f59 100644
--- a/be/src/service/child-query.cc
+++ b/be/src/service/child-query.cc
@@ -149,26 +149,28 @@ void ChildQuery::Cancel() {
     if (!is_running_) return;
     is_running_ = false;
   }
-  TUniqueId session_id;
+  TUniqueId query_id;
   TUniqueId secret_unused;
   // Ignore return statuses because they are not actionable.
-  Status status = 
ImpalaServer::THandleIdentifierToTUniqueId(hs2_handle_.operationId,
-      &session_id, &secret_unused);
+  Status status = ImpalaServer::THandleIdentifierToTUniqueId(
+      hs2_handle_.operationId, &query_id, &secret_unused);
   if (status.ok()) {
-    VLOG_QUERY << "Cancelling and closing child query with operation id: " <<
-        PrintId(session_id);
+    VLOG_QUERY << "Cancelling and closing child query with operation id: "
+               << PrintId(query_id);
   } else {
-    VLOG_QUERY << "Cancelling and closing child query. Failed to get query id: 
" <<
-        status;
+    VLOG_QUERY << "Cancelling and closing child query. Failed to get query id: 
"
+               << status;
+  }
+  // Bypass the HS2 layer so that the session of the child query will not be 
added to
+  // the connection of the caller of ChildQuery::Cancel().
+  status = parent_server_->CancelInternal(query_id);
+  if (!status.ok()) {
+    LOG(ERROR) << "Failed to cancel child query: " << status.GetDetail();
+  }
+  status = parent_server_->UnregisterQuery(query_id, true, &Status::CANCELLED);
+  if (!status.ok()) {
+    LOG(ERROR) << "Failed to unregister child query: " << status.GetDetail();
   }
-  TCancelOperationResp cancel_resp;
-  TCancelOperationReq cancel_req;
-  cancel_req.operationHandle = hs2_handle_;
-  parent_server_->CancelOperation(cancel_resp, cancel_req);
-  TCloseOperationResp close_resp;
-  TCloseOperationReq close_req;
-  close_req.operationHandle = hs2_handle_;
-  parent_server_->CloseOperation(close_resp, close_req);
 }
 
 Status ChildQuery::IsCancelled() {
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index eedef67a7..08999a55c 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -359,6 +359,10 @@ Status ClientRequestState::Exec() {
     case TStmtType::UNKNOWN:
       DCHECK(false);
       return Status("Exec request uninitialized during execution");
+    case TStmtType::KILL:
+      DCHECK(exec_req.__isset.kill_query_request);
+      LOG_AND_RETURN_IF_ERROR(ExecKillQueryRequest());
+      break;
     default:
       return Status(Substitute("Unknown exec request stmt type: $0", 
exec_req.stmt_type));
   }
@@ -2460,6 +2464,117 @@ void ClientRequestState::ExecMigrateRequestImpl() {
   }
 }
 
+Status ClientRequestState::TryKillQueryLocally(
+    const TUniqueId& query_id, const string& requesting_user, bool is_admin) {
+  Status status = ExecEnv::GetInstance()->impala_server()->KillQuery(
+      query_id, requesting_user, is_admin);
+  if (status.ok()) {
+    SetResultSet({Substitute("Query $0 is killed.", PrintId(query_id))});
+    return query_status_;
+  }
+  return status;
+}
+
+Status ClientRequestState::TryKillQueryRemotely(
+    const TUniqueId& query_id, const KillQueryRequestPB& request) {
+  // The initial status should be INVALID_QUERY_HANDLE so that if there is no 
other
+  // coordinator in the cluster, it will be the status to return.
+  Status status = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, 
PrintId(query_id));
+  ExecutorGroup all_coordinators =
+      
ExecEnv::GetInstance()->cluster_membership_mgr()->GetSnapshot()->GetCoordinators();
+  // Skipping the current impalad.
+  unique_ptr<ExecutorGroup> 
other_coordinators{ExecutorGroup::GetFilteredExecutorGroup(
+      &all_coordinators, {ExecEnv::GetInstance()->krpc_address()})};
+  // If we get an RPC error, instead of returning immediately, we record it 
and move
+  // on to the next coordinator.
+  Status rpc_errors = Status::OK();
+  for (const auto& backend : other_coordinators->GetAllExecutorDescriptors()) {
+    // The logic here is similar to ExecShutdownRequest()
+    NetworkAddressPB krpc_addr = MakeNetworkAddressPB(backend.ip_address(),
+        backend.address().port(), backend.backend_id(),
+        ExecEnv::GetInstance()->rpc_mgr()->GetUdsAddressUniqueId());
+    VLOG_QUERY << "Sending KillQuery() RPC to " << 
NetworkAddressPBToString(krpc_addr);
+    unique_ptr<ControlServiceProxy> proxy;
+    Status get_proxy_status =
+        ControlService::GetProxy(krpc_addr, backend.address().hostname(), 
&proxy);
+    if (!get_proxy_status.ok()) {
+      Status get_proxy_status_to_report{Substitute(
+          "KillQuery: Could not get Proxy to ControlService at $0 with error: 
$1.",
+          NetworkAddressPBToString(krpc_addr), get_proxy_status.msg().msg())};
+      rpc_errors.MergeStatus(get_proxy_status_to_report);
+      LOG(ERROR) << get_proxy_status_to_report.GetDetail();
+      continue;
+    }
+    KillQueryResponsePB response;
+    const int num_retries = 3;
+    const int64_t timeout_ms = 10 * MILLIS_PER_SEC;
+    const int64_t backoff_time_ms = 3 * MILLIS_PER_SEC;
+    // Currently, a KILL QUERY statement is not interruptible.
+    Status rpc_status = RpcMgr::DoRpcWithRetry(proxy, 
&ControlServiceProxy::KillQuery,
+        request, &response, query_ctx_, "KillQuery() RPC failed", num_retries, 
timeout_ms,
+        backoff_time_ms, "CRS_KILL_QUERY_RPC");
+    if (!rpc_status.ok()) {
+      LOG(ERROR) << rpc_status.GetDetail();
+      rpc_errors.MergeStatus(rpc_status);
+      continue;
+    }
+    // Currently, we only support killing one query in one KILL QUERY 
statement.
+    DCHECK_EQ(response.statuses_size(), 1);
+    status = Status(response.statuses(0));
+    if (status.ok()) {
+      // Kill succeeded.
+      VLOG_QUERY << "KillQuery: Found the coordinator at "
+                 << NetworkAddressPBToString(krpc_addr);
+      SetResultSet({Substitute("Query $0 is killed.", PrintId(query_id))});
+      return query_status_;
+    } else if (status.code() != TErrorCode::INVALID_QUERY_HANDLE) {
+      LOG(ERROR) << "KillQuery: Found the coordinator at "
+                 << NetworkAddressPBToString(krpc_addr)
+                 << " but failed to kill the query: "
+                 << status.GetDetail();
+      // Kill failed, but we found the coordinator of the query.
+      return status;
+    }
+  }
+  // We did't find the coordinator of the query after trying all other 
coordinators.
+  // If there is any RPC error, return it.
+  if (!rpc_errors.ok()) {
+    return rpc_errors;
+  }
+  // If there is no RPC error, return INVALID_QUERY_HANDLE.
+  return status;
+}
+
+Status ClientRequestState::ExecKillQueryRequest() {
+  TUniqueId query_id = exec_request().kill_query_request.query_id;
+  string requesting_user = exec_request().kill_query_request.requesting_user;
+  bool is_admin = exec_request().kill_query_request.is_admin;
+
+  VLOG_QUERY << "Exec KillQuery: query_id=" << PrintId(query_id)
+             << ", requesting_user=" << requesting_user << ", is_admin=" << 
is_admin;
+
+  // First try cancelling the query locally.
+  Status status = TryKillQueryLocally(query_id, requesting_user, is_admin);
+  if (status.code() != TErrorCode::INVALID_QUERY_HANDLE) {
+    return status;
+  }
+
+  // The current impalad is NOT the coordinator of the query. Now we have to 
broadcast
+  // the kill request to all other coordinators.
+  UniqueIdPB query_id_pb;
+  TUniqueIdToUniqueIdPB(query_id, &query_id_pb);
+  KillQueryRequestPB request;
+  *request.add_query_ids() = query_id_pb;
+  *request.mutable_requesting_user() = requesting_user;
+  request.set_is_admin(is_admin);
+  status = TryKillQueryRemotely(query_id, request);
+  if (status.code() != TErrorCode::INVALID_QUERY_HANDLE) {
+    return status;
+  }
+  // All the error messages are "Invalid or unknown query handle".
+  return Status("Could not find query on any coordinator.");
+}
+
 void ClientRequestState::AddTableResetHints(const TConvertTableRequest& params,
       Status* status) const {
   string table_reset_hint("Your table might have been renamed. To reset the 
name "
diff --git a/be/src/service/client-request-state.h 
b/be/src/service/client-request-state.h
index e91f041c5..4c6f63c55 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -978,6 +978,9 @@ class ClientRequestState {
   /// Core logic of executing a MIGRATE TABLE statement.
   void ExecMigrateRequestImpl();
 
+  /// The logic of executing a KILL QUERY statement.
+  Status ExecKillQueryRequest();
+
   /// Used when running into an error during table migration to extend 
'status' with some
   /// hints about how to reset the original table name. 'params' holds the SQL 
query
   /// string the user should run.
@@ -985,5 +988,14 @@ class ClientRequestState {
 
   /// Adds the catalog execution timeline returned from catalog RPCs into the 
profile.
   void AddCatalogTimeline();
+
+  /// Try killing query with the given query_id locally as the requesting_user.
+  /// 'is_admin' is true if requesting_user is an admin.
+  Status TryKillQueryLocally(
+      const TUniqueId& query_id, const std::string& requesting_user, bool 
is_admin);
+
+  /// Try to ask other coordinators to kill query by sending the request.
+  Status TryKillQueryRemotely(
+      const TUniqueId& query_id, const KillQueryRequestPB& request);
 };
 }
diff --git a/be/src/service/control-service.cc 
b/be/src/service/control-service.cc
index d20922458..ed52f28f7 100644
--- a/be/src/service/control-service.cc
+++ b/be/src/service/control-service.cc
@@ -221,6 +221,17 @@ void ControlService::RespondAndReleaseRpc(
   rpc_context->RespondSuccess();
 }
 
+template <typename ResponsePBType>
+void ControlService::RespondAndReleaseRpc(
+    const vector<Status>& statuses, ResponsePBType* response, RpcContext* 
rpc_context) {
+  for (int i = 0; i < statuses.size(); ++i) {
+    statuses[i].ToProto(response->add_statuses());
+  }
+  // Release the memory against the control service's memory tracker.
+  mem_tracker_->Release(rpc_context->GetTransferSize());
+  rpc_context->RespondSuccess();
+}
+
 void ControlService::CancelQueryFInstances(const 
CancelQueryFInstancesRequestPB* request,
     CancelQueryFInstancesResponsePB* response, RpcContext* rpc_context) {
   DCHECK(request->has_query_id());
@@ -249,4 +260,14 @@ void ControlService::RemoteShutdown(const 
RemoteShutdownParamsPB* req,
 
   RespondAndReleaseRpc(status, response, rpc_context);
 }
+
+void ControlService::KillQuery(const KillQueryRequestPB* request,
+    KillQueryResponsePB* response, RpcContext* rpc_context) {
+  // Currently, we only support killing one query in one KILL QUERY statement.
+  DCHECK_EQ(request->query_ids_size(), 1);
+  const TUniqueId& query_id = ProtoToQueryId(request->query_ids(0));
+  Status status = ExecEnv::GetInstance()->impala_server()->KillQuery(
+      query_id, request->requesting_user(), request->is_admin());
+  RespondAndReleaseRpc(vector{status}, response, rpc_context);
+}
 }
diff --git a/be/src/service/control-service.h b/be/src/service/control-service.h
index 2411fff6f..a38f22679 100644
--- a/be/src/service/control-service.h
+++ b/be/src/service/control-service.h
@@ -76,6 +76,10 @@ class ControlService : public ControlServiceIf {
   virtual void RemoteShutdown(const RemoteShutdownParamsPB* req,
       RemoteShutdownResultPB* response, ::kudu::rpc::RpcContext* context) 
override;
 
+  /// Kill a query. The query will be unregistered.
+  virtual void KillQuery(const KillQueryRequestPB* request,
+      KillQueryResponsePB* response, ::kudu::rpc::RpcContext* rpc_context) 
override;
+
   /// Gets a ControlService proxy to a server with 'address' and 'hostname'.
   /// The newly created proxy is returned in 'proxy'. Returns error status on 
failure.
   static Status GetProxy(const NetworkAddressPB& address, const std::string& 
hostname,
@@ -97,6 +101,11 @@ class ControlService : public ControlServiceIf {
   template <typename ResponsePBType>
   void RespondAndReleaseRpc(
       const Status& status, ResponsePBType* response, kudu::rpc::RpcContext* 
rpc_context);
+
+  /// For RPCs whose reponses have more than one Status.
+  template <typename ResponsePBType>
+  void RespondAndReleaseRpc(const vector<Status>& statuses, ResponsePBType* 
response,
+      kudu::rpc::RpcContext* rpc_context);
 };
 
 } // namespace impala
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 75d92f495..15d43ebe5 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1821,6 +1821,31 @@ Status ImpalaServer::CancelInternal(const TUniqueId& 
query_id) {
   return Status::OK();
 }
 
+Status ImpalaServer::KillQuery(
+    const TUniqueId& query_id, const string& requesting_user, bool is_admin) {
+  VLOG_QUERY << "KillQuery(): query_id=" << PrintId(query_id)
+             << ", requesting_user=" << requesting_user << ", is_admin=" << 
is_admin;
+  QueryHandle query_handle;
+  RETURN_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle));
+  if (!is_admin && requesting_user != query_handle->effective_user()) {
+    return Status(
+        Substitute("User '$0' is not authorized to kill the query.", 
requesting_user));
+  }
+  Status status = CancelInternal(query_id);
+  if (status.code() != TErrorCode::INVALID_QUERY_HANDLE) {
+    // The current impalad is the coordinator of the query.
+    RETURN_IF_ERROR(status);
+    status = UnregisterQuery(query_id, true, &Status::CANCELLED);
+    if (status.ok() || status.code() == TErrorCode::INVALID_QUERY_HANDLE) {
+      // There might be another thread that has already unregistered the query
+      // before UnregisterQuery() and after CancelInternal(). In this case we 
are done.
+      return Status::OK();
+    }
+    return status;
+  }
+  return status;
+}
+
 Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
     const SecretArg& secret, bool ignore_if_absent) {
   DCHECK(secret.is_session_secret());
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 695587e1b..2af5cfc0e 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -722,6 +722,12 @@ class ImpalaServer : public ImpalaServiceIf,
   static int64_t DecrementCount(
       std::map<std::string, int64_t>& loads, const std::string& key);
 
+  /// Try cancelling and unregistering the query with the given query_id as the
+  /// requesting_user. 'is_admin' is true if requesting_user is an admin.
+  /// This method is used in executing a KILL QUERY statement.
+  Status KillQuery(
+      const TUniqueId& query_id, const std::string& requesting_user, bool 
is_admin);
+
  private:
   struct ExpirationEvent;
   class SecretArg;
diff --git a/common/protobuf/control_service.proto 
b/common/protobuf/control_service.proto
index 02933b6f2..b91ebbc87 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -464,6 +464,21 @@ message ExecQueryFInstancesResponsePB {
   optional StatusPB status = 1;
 }
 
+message KillQueryRequestPB {
+  // The query ids of the queries to kill.
+  repeated UniqueIdPB query_ids = 1;
+
+  // The effective user who submitted this request.
+  required string requesting_user = 2;
+
+  // True if the requesting_user is an admin.
+  required bool is_admin = 3;
+}
+
+message KillQueryResponsePB {
+  repeated StatusPB statuses = 1;
+}
+
 service ControlService {
   // Override the default authorization method.
   option (kudu.rpc.default_authz_method) = "Authorize";
@@ -485,4 +500,7 @@ service ControlService {
 
   // Called to initiate shutdown of this backend.
   rpc RemoteShutdown(RemoteShutdownParamsPB) returns (RemoteShutdownResultPB);
+
+  // Called by an impalad to ask another impalad to kill a query.
+  rpc KillQuery(KillQueryRequestPB) returns (KillQueryResponsePB);
 }
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 400e4e11d..dd7ac335f 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -635,6 +635,17 @@ struct TConvertTableRequest {
   12: optional string drop_temporary_hdfs_table_query
 }
 
+// Request for a KILL QUERY statement.
+struct TKillQueryReq {
+  1: required Types.TUniqueId query_id
+
+  // The effective user who submitted this request.
+  2: required string requesting_user;
+
+  // True if the requesting_user is an admin.
+  3: required bool is_admin;
+}
+
 // Result of call to createExecRequest()
 struct TExecRequest {
   1: required Types.TStmtType stmt_type = TStmtType.UNKNOWN
@@ -716,6 +727,9 @@ struct TExecRequest {
 
   // Columns referenced in an order by clause.
   25: optional list<string> orderby_columns
+
+  // Request for "KILL QUERY" statements.
+  26: optional TKillQueryReq kill_query_request
 }
 
 // Parameters to FeSupport.cacheJar().
diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift
index 4e13a417f..0c951ee73 100644
--- a/common/thrift/Types.thrift
+++ b/common/thrift/Types.thrift
@@ -111,6 +111,7 @@ enum TStmtType {
   TESTCASE = 7
   CONVERT = 8
   UNKNOWN = 9
+  KILL = 10
 }
 
 enum TIcebergOperation {
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 28785fb64..553a1a599 100755
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -87,9 +87,24 @@ parser code {:
   // TODO: remove when V1 code is dropped.
   private TQueryOptions queryOptions;
 
+  // Whether the token is an "unreserved keyword".
+  // NOTE: This method should be kept in sync with the rule for 
'ident_or_unreserved'.
+  private boolean isUnreservedKeyword(Integer tokenId) {
+    return (
+        tokenId == SqlParserSymbols.KW_DEFAULT ||
+        tokenId == SqlParserSymbols.KW_KILL ||
+        tokenId == SqlParserSymbols.KW_QUERY
+    );
+  }
+
   // to avoid reporting trivial tokens as expected tokens in error messages
-  private boolean reportExpectedToken(Integer tokenId, int numExpectedTokens) {
-    if (SqlScanner.isKeyword(tokenId) ||
+  private boolean reportExpectedToken(
+      Integer tokenId, int numExpectedTokens, boolean identifierExpected) {
+    if (identifierExpected && isUnreservedKeyword(tokenId)) {
+      // Skip all "unreserved keywords" in the expected token list if
+      // 'IDENTIFIER' exists since unreserved keywords can be used as 
identifiers.
+      return false;
+    } else if (SqlScanner.isKeyword(tokenId) ||
         tokenId.intValue() == SqlParserSymbols.COMMA ||
         tokenId.intValue() == SqlParserSymbols.IDENT) {
       return true;
@@ -253,6 +268,12 @@ parser code {:
     result.append("\n");
 
     boolean identifierExpected = false;
+    for (Integer tokenId : expectedTokenIds_) {
+      if (tokenId == SqlParserSymbols.IDENT) {
+        identifierExpected = true;
+        break;
+      }
+    }
 
     // append expected tokens
     result.append("Expected: ");
@@ -261,11 +282,8 @@ parser code {:
       Integer tokenId = null;
       for (int i = 0; i < expectedTokenIds_.size(); ++i) {
         tokenId = expectedTokenIds_.get(i);
-        if (reportExpectedToken(tokenId, expectedTokenIds_.size())) {
+        if (reportExpectedToken(tokenId, expectedTokenIds_.size(), 
identifierExpected)) {
           expectedToken = SqlScanner.tokenIdMap.get(tokenId);
-          
if(expectedToken.equals(SqlScanner.tokenIdMap.get(SqlParserSymbols.IDENT))) {
-            identifierExpected = true;
-          }
           result.append(expectedToken + ", ");
         }
       }
@@ -317,13 +335,13 @@ terminal
   KW_GROUP, KW_GROUPING, KW_HASH, KW_HUDIPARQUET, KW_IGNORE, KW_HAVING, 
KW_ICEBERG, KW_IF,
   KW_ILIKE, KW_IN, KW_INCREMENTAL, KW_INIT_FN, KW_INNER, KW_INPATH, KW_INSERT, 
KW_INT,
   KW_INTERMEDIATE, KW_INTERSECT, KW_INTERVAL, KW_INTO, KW_INVALIDATE, 
KW_IREGEXP, KW_IS,
-  KW_JDBC, KW_JOIN, KW_JSONFILE, KW_KUDU, KW_LAST, KW_LEADING, KW_LEFT, 
KW_LEXICAL, KW_LIKE,
-  KW_LIMIT, KW_LINES, KW_LOAD, KW_LOCATION, KW_LOGICAL_OR, 
KW_MANAGED_LOCATION, KW_MAP, KW_MATCHED,
-  KW_MERGE, KW_MERGE_FN, KW_METADATA, KW_MINUS, KW_NON, KW_NORELY, KW_NOT,
+  KW_JDBC, KW_JOIN, KW_JSONFILE, KW_KILL, KW_KUDU, KW_LAST, KW_LEADING, 
KW_LEFT, KW_LEXICAL,
+  KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD, KW_LOCATION, KW_LOGICAL_OR, 
KW_MANAGED_LOCATION, KW_MAP,
+  KW_MATCHED, KW_MERGE, KW_MERGE_FN, KW_METADATA, KW_MINUS, KW_NON, KW_NORELY, 
KW_NOT,
   KW_NOVALIDATE, KW_NULL, KW_NULLS, KW_OF, KW_OFFSET, KW_ON, KW_OPTIMIZE, 
KW_OR,
   KW_ORC, KW_ORDER, KW_OUTER,
   KW_OVER, KW_OVERWRITE, KW_PARQUET, KW_PARQUETFILE, KW_PARTITION, 
KW_PARTITIONED,
-  KW_PARTITIONS, KW_PRECEDING, KW_PREPARE_FN, KW_PRIMARY, KW_PRODUCED, 
KW_PURGE,
+  KW_PARTITIONS, KW_PRECEDING, KW_PREPARE_FN, KW_PRIMARY, KW_PRODUCED, 
KW_PURGE, KW_QUERY,
   KW_RANGE, KW_RCFILE, KW_RECOVER, KW_REFERENCES, KW_REFRESH, KW_REGEXP, 
KW_RELY,
   KW_RENAME, KW_REPEATABLE, KW_REPLACE, KW_REPLICATION, KW_RESTRICT, 
KW_RETURNS,
   KW_REVOKE, KW_RIGHT, KW_RLIKE, KW_ROLE, KW_ROLES, KW_ROLLUP, KW_ROW, 
KW_ROWS, KW_RWSTORAGE,
@@ -355,10 +373,9 @@ terminal String UNMATCHED_STRING_LITERAL;
 terminal String UNEXPECTED_CHAR;
 terminal KW_CLUSTERED, KW_BUCKETS;
 
-// IMPALA-3726 introduced the DEFAULT keyword which could break existing 
applications
-// that use the identifier "KEYWORD" as database, column or table names. To 
avoid that,
-// the ident_or_default non-terminal is introduced and should be used instead 
of IDENT.
-nonterminal String ident_or_default;
+// An identifier or an "unreserved keyword". An unreserved keyword is a 
keyword that can
+// also be used as an identifier.
+nonterminal String ident_or_unreserved;
 // A word is an arbitrary token composed of digits and at least one letter. 
Reserved
 // words cannot be used as identifiers but they are words and can be used in 
query
 // options, column attributes, etc.
@@ -637,6 +654,9 @@ nonterminal AdminFnStmt admin_fn_stmt;
 // For "ALTER TABLE ... CONVERT TO" statements
 nonterminal ConvertTableToIcebergStmt convert_tbl_stmt;
 
+// For "KILL QUERY" statemnts
+nonterminal KillQueryStmt kill_query_stmt;
+
 precedence left KW_LOGICAL_OR;
 precedence left KW_OR;
 precedence left KW_AND;
@@ -794,6 +814,8 @@ stmt ::=
   {: RESULT = s; :}
   | convert_tbl_stmt: convert
   {: RESULT = convert; :}
+  | kill_query_stmt: kill_query
+  {: RESULT = kill_query; :}
   ;
 
 load_stmt ::=
@@ -825,7 +847,7 @@ reset_metadata_stmt ::=
   {: RESULT = ResetMetadataStmt.createRefreshTableStmt(table); :}
   | KW_REFRESH table_name:table partition_spec:partition
   {: RESULT = ResetMetadataStmt.createRefreshPartitionStmt(table, partition); 
:}
-  | KW_REFRESH KW_FUNCTIONS ident_or_default:db
+  | KW_REFRESH KW_FUNCTIONS ident_or_unreserved:db
   {: RESULT = ResetMetadataStmt.createRefreshFunctionsStmt(db); :}
   | KW_REFRESH KW_AUTHORIZATION
   {: RESULT = ResetMetadataStmt.createRefreshAuthorizationStmt(); :}
@@ -1147,34 +1169,34 @@ opt_kw_table ::=
 show_roles_stmt ::=
   KW_SHOW KW_ROLES
   {: RESULT = new ShowRolesStmt(false, null); :}
-  | KW_SHOW KW_ROLE KW_GRANT KW_GROUP ident_or_default:group
+  | KW_SHOW KW_ROLE KW_GRANT KW_GROUP ident_or_unreserved:group
   {: RESULT = new ShowRolesStmt(false, group); :}
   | KW_SHOW KW_CURRENT KW_ROLES
   {: RESULT = new ShowRolesStmt(true, null); :}
   ;
 
 show_grant_principal_stmt ::=
-  KW_SHOW KW_GRANT principal_type:type ident_or_default:name
+  KW_SHOW KW_GRANT principal_type:type ident_or_unreserved:name
   {: RESULT = new ShowGrantPrincipalStmt(name, type, null); :}
-  | KW_SHOW KW_GRANT principal_type:type ident_or_default:name KW_ON
+  | KW_SHOW KW_GRANT principal_type:type ident_or_unreserved:name KW_ON
   server_ident:server_kw
   {:
     RESULT = new ShowGrantPrincipalStmt(name, type,
         PrivilegeSpec.createServerScopedPriv(TPrivilegeLevel.ALL));
   :}
-  | KW_SHOW KW_GRANT principal_type:type ident_or_default:name KW_ON
-    KW_DATABASE ident_or_default:db_name
+  | KW_SHOW KW_GRANT principal_type:type ident_or_unreserved:name KW_ON
+    KW_DATABASE ident_or_unreserved:db_name
   {:
     RESULT = new ShowGrantPrincipalStmt(name, type,
         PrivilegeSpec.createDbScopedPriv(TPrivilegeLevel.ALL, db_name));
   :}
-  | KW_SHOW KW_GRANT principal_type:type ident_or_default:name KW_ON KW_TABLE
+  | KW_SHOW KW_GRANT principal_type:type ident_or_unreserved:name KW_ON 
KW_TABLE
   table_name:tbl_name
   {:
     RESULT = new ShowGrantPrincipalStmt(name, type,
         PrivilegeSpec.createTableScopedPriv(TPrivilegeLevel.ALL, tbl_name));
   :}
-  | KW_SHOW KW_GRANT principal_type:type ident_or_default:name KW_ON KW_COLUMN
+  | KW_SHOW KW_GRANT principal_type:type ident_or_unreserved:name KW_ON 
KW_COLUMN
   column_name:col_name
   {:
     RESULT = new ShowGrantPrincipalStmt(name, type,
@@ -1182,20 +1204,20 @@ show_grant_principal_stmt ::=
             col_name.getTableName(),
             Collections.singletonList(col_name.getColumnName())));
   :}
-  | KW_SHOW KW_GRANT principal_type:type ident_or_default:name KW_ON 
uri_ident:uri_kw
+  | KW_SHOW KW_GRANT principal_type:type ident_or_unreserved:name KW_ON 
uri_ident:uri_kw
     STRING_LITERAL:uri
   {:
     RESULT = new ShowGrantPrincipalStmt(name, type,
         PrivilegeSpec.createUriScopedPriv(TPrivilegeLevel.ALL, new 
HdfsUri(uri)));
   :}
-  | KW_SHOW KW_GRANT principal_type:type ident_or_default:name KW_ON 
KW_STORAGE_HANDLER_URI
+  | KW_SHOW KW_GRANT principal_type:type ident_or_unreserved:name KW_ON 
KW_STORAGE_HANDLER_URI
   STRING_LITERAL:storage_handler_uri
   {:
     RESULT = new ShowGrantPrincipalStmt(name, type,
         
PrivilegeSpec.createStorageHandlerUriScopedPriv(TPrivilegeLevel.RWSTORAGE,
         new StorageHandlerUri(storage_handler_uri)));
   :}
-  | KW_SHOW KW_GRANT principal_type:type ident_or_default:name KW_ON KW_UDF
+  | KW_SHOW KW_GRANT principal_type:type ident_or_unreserved:name KW_ON KW_UDF
   function_name:fn_name
   {:
     RESULT = new ShowGrantPrincipalStmt(name, type,
@@ -1204,38 +1226,38 @@ show_grant_principal_stmt ::=
   ;
 
 create_drop_role_stmt ::=
-  KW_CREATE KW_ROLE ident_or_default:role
+  KW_CREATE KW_ROLE ident_or_unreserved:role
   {: RESULT = new CreateDropRoleStmt(role, false); :}
-  | KW_DROP KW_ROLE ident_or_default:role
+  | KW_DROP KW_ROLE ident_or_unreserved:role
   {: RESULT = new CreateDropRoleStmt(role, true); :}
   ;
 
 grant_role_stmt ::=
-  KW_GRANT KW_ROLE ident_or_default:role KW_TO KW_GROUP ident_or_default:group
+  KW_GRANT KW_ROLE ident_or_unreserved:role KW_TO KW_GROUP 
ident_or_unreserved:group
   {: RESULT = new GrantRevokeRoleStmt(role, group, true); :}
   ;
 
 revoke_role_stmt ::=
-  KW_REVOKE KW_ROLE ident_or_default:role KW_FROM KW_GROUP 
ident_or_default:group
+  KW_REVOKE KW_ROLE ident_or_unreserved:role KW_FROM KW_GROUP 
ident_or_unreserved:group
   {: RESULT = new GrantRevokeRoleStmt(role, group, false); :}
   ;
 
 // For backwards compatibility, a grant without the principal type will 
default to
 // TPrincipalType.ROLE
 grant_privilege_stmt ::=
-  KW_GRANT privilege_spec:priv KW_TO KW_ROLE ident_or_default:role
+  KW_GRANT privilege_spec:priv KW_TO KW_ROLE ident_or_unreserved:role
   opt_with_grantopt:grant_opt
   {: RESULT = new GrantRevokePrivStmt(role, priv, true, grant_opt, 
TPrincipalType.ROLE); :}
-  | KW_GRANT privilege_spec:priv KW_TO ident_or_default:role
+  | KW_GRANT privilege_spec:priv KW_TO ident_or_unreserved:role
     opt_with_grantopt:grant_opt
   {: RESULT = new GrantRevokePrivStmt(role, priv, true, grant_opt, 
TPrincipalType.ROLE); :}
-  | KW_GRANT privilege_spec:priv KW_TO IDENT:user_id ident_or_default:user
+  | KW_GRANT privilege_spec:priv KW_TO IDENT:user_id ident_or_unreserved:user
     opt_with_grantopt:grant_opt
   {:
     parser.checkIdentKeyword("USER", user_id);
     RESULT = new GrantRevokePrivStmt(user, priv, true, grant_opt, 
TPrincipalType.USER);
   :}
-  | KW_GRANT privilege_spec:priv KW_TO KW_GROUP ident_or_default:group
+  | KW_GRANT privilege_spec:priv KW_TO KW_GROUP ident_or_unreserved:group
     opt_with_grantopt:grant_opt
   {: RESULT = new GrantRevokePrivStmt(group, priv, true, grant_opt, 
TPrincipalType.GROUP); :}
   ;
@@ -1244,28 +1266,28 @@ grant_privilege_stmt ::=
 // TPrincipalType.ROLE
 revoke_privilege_stmt ::=
   KW_REVOKE opt_grantopt_for:grant_opt privilege_spec:priv KW_FROM
-  KW_ROLE ident_or_default:role
+  KW_ROLE ident_or_unreserved:role
   {: RESULT = new GrantRevokePrivStmt(role, priv, false, grant_opt, 
TPrincipalType.ROLE); :}
   | KW_REVOKE opt_grantopt_for:grant_opt privilege_spec:priv KW_FROM
-    ident_or_default:role
+    ident_or_unreserved:role
   {: RESULT = new GrantRevokePrivStmt(role, priv, false, grant_opt, 
TPrincipalType.ROLE); :}
   | KW_REVOKE opt_grantopt_for:grant_opt privilege_spec:priv KW_FROM
-    IDENT:user_id ident_or_default:user
+    IDENT:user_id ident_or_unreserved:user
   {:
     parser.checkIdentKeyword("USER", user_id);
     RESULT = new GrantRevokePrivStmt(user, priv, false, grant_opt, 
TPrincipalType.USER);
   :}
   | KW_REVOKE opt_grantopt_for:grant_opt privilege_spec:priv KW_FROM
-    KW_GROUP ident_or_default:group
+    KW_GROUP ident_or_unreserved:group
   {: RESULT = new GrantRevokePrivStmt(group, priv, false, grant_opt, 
TPrincipalType.GROUP); :}
   ;
 
 privilege_spec ::=
   privilege:priv KW_ON server_ident:server_kw
   {: RESULT = PrivilegeSpec.createServerScopedPriv(priv); :}
-  | privilege:priv KW_ON server_ident:server_kw ident_or_default:server_name
+  | privilege:priv KW_ON server_ident:server_kw ident_or_unreserved:server_name
   {: RESULT = PrivilegeSpec.createServerScopedPriv(priv, server_name); :}
-  | privilege:priv KW_ON KW_DATABASE ident_or_default:db_name
+  | privilege:priv KW_ON KW_DATABASE ident_or_unreserved:db_name
   {: RESULT = PrivilegeSpec.createDbScopedPriv(priv, db_name); :}
   | privilege:priv KW_ON KW_TABLE table_name:tbl_name
   {: RESULT = PrivilegeSpec.createTableScopedPriv(priv, tbl_name); :}
@@ -1346,7 +1368,7 @@ partition_def_list ::=
   ;
 
 comment_on_stmt ::=
-  KW_COMMENT KW_ON KW_DATABASE ident_or_default:db_name KW_IS 
nullable_comment_val:comment
+  KW_COMMENT KW_ON KW_DATABASE ident_or_unreserved:db_name KW_IS 
nullable_comment_val:comment
   {: RESULT = new CommentOnDbStmt(db_name, comment); :}
   | KW_COMMENT KW_ON KW_TABLE table_name:table KW_IS 
nullable_comment_val:comment
   {: RESULT = new CommentOnTableStmt(table, comment); :}
@@ -1360,15 +1382,15 @@ comment_on_stmt ::=
 // such that any names that use OWNER or USER will need to be escaped. By 
using IDENT
 // token we can make OWNER and USER to be keywords only in these statements.
 alter_db_stmt ::=
-  KW_ALTER KW_DATABASE ident_or_default:db KW_SET IDENT:owner_id IDENT:user_id
-  ident_or_default:user
+  KW_ALTER KW_DATABASE ident_or_unreserved:db KW_SET IDENT:owner_id 
IDENT:user_id
+  ident_or_unreserved:user
   {:
     parser.checkIdentKeyword("OWNER", owner_id);
     parser.checkIdentKeyword("USER", user_id);
     RESULT = new AlterDbSetOwnerStmt(db, new Owner(TOwnerType.USER, user));
   :}
-  | KW_ALTER KW_DATABASE ident_or_default:db KW_SET IDENT:owner_id KW_ROLE
-    ident_or_default:role
+  | KW_ALTER KW_DATABASE ident_or_unreserved:db KW_SET IDENT:owner_id KW_ROLE
+    ident_or_unreserved:role
   {:
     parser.checkIdentKeyword("OWNER", owner_id);
     RESULT = new AlterDbSetOwnerStmt(db, new Owner(TOwnerType.ROLE, role));
@@ -1398,7 +1420,7 @@ alter_tbl_stmt ::=
   | KW_ALTER KW_TABLE table_name:table KW_ADD if_not_exists_val:if_not_exists
     partition_def_list:partitions KW_SET KW_FILEFORMAT 
file_format_val:file_format
   {: RESULT = new AlterTableAddPartitionStmt(table, if_not_exists, partitions, 
file_format); :}
-  | KW_ALTER KW_TABLE table_name:table KW_DROP opt_kw_column 
ident_or_default:col_name
+  | KW_ALTER KW_TABLE table_name:table KW_DROP opt_kw_column 
ident_or_unreserved:col_name
   {: RESULT = new AlterTableDropColStmt(table, col_name); :}
   | KW_ALTER KW_TABLE table_name:table KW_ADD if_not_exists_val:if_not_exists
     KW_RANGE range_param:partition
@@ -1406,7 +1428,7 @@ alter_tbl_stmt ::=
     RESULT = new AlterTableAddDropRangePartitionStmt(table, partition, 
if_not_exists,
         Operation.ADD);
   :}
-  | KW_ALTER KW_TABLE table_name:table KW_CHANGE opt_kw_column 
ident_or_default:col_name
+  | KW_ALTER KW_TABLE table_name:table KW_CHANGE opt_kw_column 
ident_or_unreserved:col_name
     column_def:col_def
   {: RESULT = AlterTableAlterColStmt.createChangeColStmt(table, col_name, 
col_def); :}
   | KW_ALTER KW_TABLE table_name:table KW_DROP if_exists_val:if_exists
@@ -1453,7 +1475,7 @@ alter_tbl_stmt ::=
     RESULT = new AlterTableSortByStmt(table, col_names, TSortingOrder.ZORDER);
   :}
   | KW_ALTER KW_TABLE table_name:table opt_partition_set:partition KW_SET
-    KW_COLUMN KW_STATS ident_or_default:col LPAREN properties_map:map RPAREN
+    KW_COLUMN KW_STATS ident_or_unreserved:col LPAREN properties_map:map RPAREN
   {:
     // See above for special partition clause handling.
     if (partition != null) parser.parseError("set", SqlParserSymbols.KW_SET);
@@ -1471,17 +1493,17 @@ alter_tbl_stmt ::=
   :}
   | KW_ALTER KW_TABLE table_name:table KW_RECOVER KW_PARTITIONS
   {: RESULT = new AlterTableRecoverPartitionsStmt(table); :}
-  | KW_ALTER KW_TABLE table_name:table KW_ALTER opt_kw_column 
ident_or_default:col_name
+  | KW_ALTER KW_TABLE table_name:table KW_ALTER opt_kw_column 
ident_or_unreserved:col_name
     KW_SET column_options_map:options
   {:
     RESULT = new AlterTableAlterColStmt(
         table, col_name, new ColumnDef(col_name, null, options));
   :}
-  | KW_ALTER KW_TABLE table_name:table KW_ALTER opt_kw_column 
ident_or_default:col_name
+  | KW_ALTER KW_TABLE table_name:table KW_ALTER opt_kw_column 
ident_or_unreserved:col_name
     KW_DROP KW_DEFAULT
   {: RESULT = AlterTableAlterColStmt.createDropDefaultStmt(table, col_name); :}
   | KW_ALTER KW_TABLE table_name:table opt_partition_set:partitions KW_SET 
IDENT:owner_id
-    IDENT:user_id ident_or_default:user
+    IDENT:user_id ident_or_unreserved:user
   {:
     // See above for special partition clause handling.
     if (partitions != null) parser.parseError("set", SqlParserSymbols.KW_SET);
@@ -1490,7 +1512,7 @@ alter_tbl_stmt ::=
     RESULT = new AlterTableSetOwnerStmt(table, new Owner(TOwnerType.USER, 
user));
   :}
   | KW_ALTER KW_TABLE table_name:table opt_partition_set:partitions KW_SET 
IDENT:owner_id
-    KW_ROLE ident_or_default:role
+    KW_ROLE ident_or_unreserved:role
   {:
     // See above for special partition clause handling.
     if (partitions != null) parser.parseError("set", SqlParserSymbols.KW_SET);
@@ -1521,6 +1543,13 @@ convert_tbl_stmt ::=
   {: RESULT =  new ConvertTableToIcebergStmt(table, props); :}
   ;
 
+kill_query_stmt ::=
+  KW_KILL KW_QUERY STRING_LITERAL:query_id
+  {:
+    RESULT = new KillQueryStmt(query_id);
+  :}
+  ;
+
 table_property_type ::=
   KW_TBLPROPERTIES
   {: RESULT = TTablePropertyType.TBL_PROPERTY; :}
@@ -1534,7 +1563,7 @@ opt_kw_column ::=
   ;
 
 create_db_stmt ::=
-  KW_CREATE db_or_schema_kw if_not_exists_val:if_not_exists 
ident_or_default:db_name
+  KW_CREATE db_or_schema_kw if_not_exists_val:if_not_exists 
ident_or_unreserved:db_name
   opt_comment_val:comment location_val:location 
managed_location_val:managed_location
   {: RESULT = new CreateDbStmt(db_name, comment, location,
                                managed_location, if_not_exists);
@@ -1645,7 +1674,7 @@ create_tbl_stmt ::=
     RESULT = new CreateTableStmt(tbl_def);
   :}
   | KW_CREATE tbl_def_with_col_defs:tbl_def
-    KW_PRODUCED KW_BY KW_DATA source_ident:is_source_id 
ident_or_default:data_src_name
+    KW_PRODUCED KW_BY KW_DATA source_ident:is_source_id 
ident_or_unreserved:data_src_name
     opt_init_string_val:init_string
     opt_comment_val:comment
   {:
@@ -2083,21 +2112,21 @@ iceberg_partition_field_defs ::=
   ;
 
 iceberg_partition_field_def ::=
-  IDENT:col_name
+  ident_or_unreserved:col_name
   {:
     RESULT = new IcebergPartitionField(col_name,
                                        
IcebergUtil.getPartitionTransform("IDENTITY"));
   :}
   |
   iceberg_partition_transform_type:partition_transform
-  LPAREN IDENT:col_name RPAREN
+  LPAREN ident_or_unreserved:col_name RPAREN
   {:
     RESULT = new IcebergPartitionField(col_name, 
IcebergUtil.getPartitionTransform(
                                        partition_transform));
   :}
   | iceberg_partition_transform_type:partition_transform
     LPAREN
-      INTEGER_LITERAL:transform_param COMMA IDENT:col_name
+      INTEGER_LITERAL:transform_param COMMA ident_or_unreserved:col_name
     RPAREN
   {:
     RESULT = new IcebergPartitionField(col_name, 
IcebergUtil.getPartitionTransform(
@@ -2367,9 +2396,9 @@ column_def_list ::=
   ;
 
 column_def ::=
-  ident_or_default:col_name type_def:type column_options_map:options
+  ident_or_unreserved:col_name type_def:type column_options_map:options
   {: RESULT = new ColumnDef(col_name, type, options); :}
-  | ident_or_default:col_name type_def:type
+  | ident_or_unreserved:col_name type_def:type
   {: RESULT = new ColumnDef(col_name, type); :}
   ;
 
@@ -2454,7 +2483,7 @@ create_view_stmt ::=
 
 create_data_src_stmt ::=
   KW_CREATE KW_DATA source_ident:is_source_id
-  if_not_exists_val:if_not_exists ident_or_default:data_src_name
+  if_not_exists_val:if_not_exists ident_or_unreserved:data_src_name
   location_val:location
   KW_CLASS STRING_LITERAL:class_name
   KW_API_VERSION STRING_LITERAL:api_version
@@ -2574,7 +2603,7 @@ view_column_def_list ::=
   ;
 
 view_column_def ::=
-  ident_or_default:col_name opt_comment_val:comment
+  ident_or_unreserved:col_name opt_comment_val:comment
   {:
     LinkedHashMap<Option, Object> options = Maps.newLinkedHashMap();
     if (comment != null) options.put(Option.COMMENT, comment);
@@ -2589,13 +2618,13 @@ alter_view_stmt ::=
   | KW_ALTER KW_VIEW table_name:before_table KW_RENAME KW_TO 
table_name:new_table
   {: RESULT = new AlterTableOrViewRenameStmt(before_table, new_table, false); 
:}
   | KW_ALTER KW_VIEW table_name:table KW_SET IDENT:owner_id IDENT:user_id
-    ident_or_default:user
+    ident_or_unreserved:user
   {:
     parser.checkIdentKeyword("OWNER", owner_id);
     parser.checkIdentKeyword("USER", user_id);
     RESULT = new AlterViewSetOwnerStmt(table, new Owner(TOwnerType.USER, 
user));
   :}
-  | KW_ALTER KW_VIEW table_name:table KW_SET IDENT:owner_id KW_ROLE 
ident_or_default:role
+  | KW_ALTER KW_VIEW table_name:table KW_SET IDENT:owner_id KW_ROLE 
ident_or_unreserved:role
   {:
     parser.checkIdentKeyword("OWNER", owner_id);
     RESULT = new AlterViewSetOwnerStmt(table, new Owner(TOwnerType.ROLE, 
role));
@@ -2647,7 +2676,7 @@ drop_stats_stmt ::=
   ;
 
 drop_db_stmt ::=
-  KW_DROP db_or_schema_kw if_exists_val:if_exists ident_or_default:db_name
+  KW_DROP db_or_schema_kw if_exists_val:if_exists ident_or_unreserved:db_name
   cascade_val:cascade
   {: RESULT = new DropDbStmt(db_name, if_exists, cascade); :}
   ;
@@ -2671,7 +2700,7 @@ drop_function_stmt ::=
 
 drop_data_src_stmt ::=
   KW_DROP KW_DATA source_ident:is_source_id if_exists_val:if_exists
-  ident_or_default:data_src_name
+  ident_or_unreserved:data_src_name
   {: RESULT = new DropDataSrcStmt(data_src_name, if_exists); :}
   ;
 
@@ -2761,7 +2790,7 @@ static_partition_key_value_list ::=
 
 partition_key_value ::=
   // Dynamic partition key values.
-  ident_or_default:column
+  ident_or_unreserved:column
   {: RESULT = new PartitionKeyValue(column, null); :}
   | static_partition_key_value:partition
   {: RESULT = partition; :}
@@ -2769,7 +2798,7 @@ partition_key_value ::=
 
 static_partition_key_value ::=
   // Static partition key values.
-  ident_or_default:column EQUAL expr:e
+  ident_or_unreserved:column EQUAL expr:e
   {: RESULT = new PartitionKeyValue(column, e); :}
   ;
 
@@ -2903,11 +2932,11 @@ opt_with_clause ::=
   ;
 
 with_view_def ::=
-  ident_or_default:alias KW_AS LPAREN query_stmt:query RPAREN
+  ident_or_unreserved:alias KW_AS LPAREN query_stmt:query RPAREN
   {: RESULT = new View(alias, query, null); :}
   | STRING_LITERAL:alias KW_AS LPAREN query_stmt:query RPAREN
   {: RESULT = new View(alias, query, null); :}
-  | ident_or_default:alias LPAREN ident_list:col_names RPAREN KW_AS LPAREN
+  | ident_or_unreserved:alias LPAREN ident_list:col_names RPAREN KW_AS LPAREN
     query_stmt:query RPAREN
   {: RESULT = new View(alias, query, col_names); :}
   | STRING_LITERAL:alias LPAREN ident_list:col_names RPAREN
@@ -3057,7 +3086,7 @@ values_operand_list ::=
   ;
 
 use_stmt ::=
-  KW_USE ident_or_default:db
+  KW_USE ident_or_unreserved:db
   {: RESULT = new UseStmt(db); :}
   ;
 
@@ -3066,9 +3095,9 @@ show_tables_stmt ::=
   {: RESULT = new ShowTablesStmt(); :}
   | KW_SHOW KW_TABLES show_pattern:showPattern
   {: RESULT = new ShowTablesStmt(showPattern); :}
-  | KW_SHOW KW_TABLES KW_IN ident_or_default:db
+  | KW_SHOW KW_TABLES KW_IN ident_or_unreserved:db
   {: RESULT = new ShowTablesStmt(db, null); :}
-  | KW_SHOW KW_TABLES KW_IN ident_or_default:db show_pattern:showPattern
+  | KW_SHOW KW_TABLES KW_IN ident_or_unreserved:db show_pattern:showPattern
   {: RESULT = new ShowTablesStmt(db, showPattern); :}
   ;
 
@@ -3086,9 +3115,9 @@ show_views_stmt ::=
   {: RESULT = new ShowViewsStmt(); :}
   | KW_SHOW KW_VIEWS show_pattern:showPattern
   {: RESULT = new ShowViewsStmt(showPattern); :}
-  | KW_SHOW KW_VIEWS KW_IN ident_or_default:db
+  | KW_SHOW KW_VIEWS KW_IN ident_or_unreserved:db
   {: RESULT = new ShowViewsStmt(db, null); :}
-  | KW_SHOW KW_VIEWS KW_IN ident_or_default:db show_pattern:showPattern
+  | KW_SHOW KW_VIEWS KW_IN ident_or_unreserved:db show_pattern:showPattern
   {: RESULT = new ShowViewsStmt(db, showPattern); :}
   ;
 
@@ -3126,9 +3155,9 @@ show_functions_stmt ::=
   {: RESULT = new ShowFunctionsStmt(null, null, fn_type); :}
   | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS show_pattern:showPattern
   {: RESULT = new ShowFunctionsStmt(null, showPattern, fn_type); :}
-  | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN 
ident_or_default:db
+  | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN 
ident_or_unreserved:db
   {: RESULT = new ShowFunctionsStmt(db, null, fn_type); :}
-  | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN 
ident_or_default:db
+  | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN 
ident_or_unreserved:db
       show_pattern:showPattern
   {: RESULT = new ShowFunctionsStmt(db, showPattern, fn_type); :}
   ;
@@ -3181,7 +3210,7 @@ show_files_stmt ::=
   ;
 
 describe_stmt ::=
-  KW_DESCRIBE db_or_schema_kw describe_output_style:style ident_or_default:db
+  KW_DESCRIBE db_or_schema_kw describe_output_style:style 
ident_or_unreserved:db
   {: RESULT = new DescribeDbStmt(db, style); :}
   | KW_DESCRIBE KW_FORMATTED dotted_path:path
   {: RESULT = new DescribeTableStmt(path, TDescribeOutputStyle.FORMATTED); :}
@@ -3254,15 +3283,15 @@ select_clause ::=
   ;
 
 set_stmt ::=
-  KW_SET ident_or_default:key EQUAL numeric_literal:l
+  KW_SET ident_or_unreserved:key EQUAL numeric_literal:l
   {: RESULT = new SetStmt(key, l.getStringValue(), TQueryOptionType.SET_ONE); 
:}
-  | KW_SET ident_or_default:key EQUAL STRING_LITERAL:l
+  | KW_SET ident_or_unreserved:key EQUAL STRING_LITERAL:l
   {: RESULT = new SetStmt(key, l, TQueryOptionType.SET_ONE); :}
-  | KW_SET ident_or_default:key EQUAL SUBTRACT numeric_literal:l
+  | KW_SET ident_or_unreserved:key EQUAL SUBTRACT numeric_literal:l
   {:
     l.swapSign();
     RESULT = new SetStmt(key, l.getStringValue(), TQueryOptionType.SET_ONE); :}
-  | KW_SET ident_or_default:key EQUAL word:value
+  | KW_SET ident_or_unreserved:key EQUAL word:value
   {: RESULT = new SetStmt(key, value, TQueryOptionType.SET_ONE); :}
   | KW_SET KW_ALL
   {: RESULT = new SetStmt(null, null, TQueryOptionType.SET_ALL); :}
@@ -3277,9 +3306,9 @@ unset_stmt ::=
 
 // Top-level function call, e.g. ": shutdown()", used for admin commands, etc.
 admin_fn_stmt ::=
-    COLON ident_or_default:fn_name LPAREN RPAREN
+    COLON ident_or_unreserved:fn_name LPAREN RPAREN
   {: RESULT = new AdminFnStmt(fn_name, Collections.<Expr>emptyList()); :}
-  | COLON ident_or_default:fn_name LPAREN expr_list:params RPAREN
+  | COLON ident_or_unreserved:fn_name LPAREN expr_list:params RPAREN
   {: RESULT = new AdminFnStmt(fn_name, params); :}
   ;
 
@@ -3307,9 +3336,9 @@ select_list_item ::=
   ;
 
 alias_clause ::=
-  KW_AS ident_or_default:ident
+  KW_AS ident_or_unreserved:ident
   {: RESULT = ident; :}
-  | ident_or_default:ident
+  | ident_or_unreserved:ident
   {: RESULT = ident; :}
   | KW_AS STRING_LITERAL:l
   {: RESULT = l; :}
@@ -3325,16 +3354,16 @@ star_expr ::=
   ;
 
 table_name ::=
-  ident_or_default:tbl
+  ident_or_unreserved:tbl
   {: RESULT = new TableName(null, tbl); :}
-  | ident_or_default:db DOT ident_or_default:tbl
+  | ident_or_unreserved:db DOT ident_or_unreserved:tbl
   {: RESULT = new TableName(db, tbl); :}
   ;
 
 column_name ::=
-  | ident_or_default:tbl DOT ident_or_default:col
+  | ident_or_unreserved:tbl DOT ident_or_unreserved:col
   {: RESULT = new ColumnName(new TableName(null, tbl), col); :}
-  | ident_or_default:db DOT ident_or_default:tbl DOT ident_or_default:col
+  | ident_or_unreserved:db DOT ident_or_unreserved:tbl DOT 
ident_or_unreserved:col
   {: RESULT = new ColumnName(new TableName(db, tbl), col); :}
   ;
 
@@ -3546,11 +3575,11 @@ plan_hint ::=
   {: RESULT = new PlanHint("straight_join"); :}
   | KW_CLUSTERED
   {: RESULT = new PlanHint("clustered"); :}
-  | IDENT:name
+  | ident_or_unreserved:name
   {: RESULT = new PlanHint(name); :}
-  | IDENT:name LPAREN ident_list:args RPAREN
+  | ident_or_unreserved:name LPAREN ident_list:args RPAREN
   {: RESULT = new PlanHint(name, args); :}
-  | IDENT:name LPAREN INTEGER_LITERAL:value RPAREN
+  | ident_or_unreserved:name LPAREN INTEGER_LITERAL:value RPAREN
   {:
     if (value != null) {
       RESULT = new PlanHint(name,
@@ -3577,13 +3606,13 @@ plan_hint_list ::=
   ;
 
 ident_list ::=
-  ident_or_default:ident
+  ident_or_unreserved:ident
   {:
     List<String> list = new ArrayList<>();
     list.add(ident);
     RESULT = list;
   :}
-  | ident_list:list COMMA ident_or_default:ident
+  | ident_list:list COMMA ident_or_unreserved:ident
   {:
     list.add(ident);
     RESULT = list;
@@ -3913,7 +3942,7 @@ function_call_expr ::=
   // avoid adding new keywords.
   // Common syntax for "EXTRACT(unit FROM TIMESTAMP/DATE ts)"
   // and "TRIM(where FROM string)":
-  | function_name:fn_name LPAREN ident_or_default:i KW_FROM expr:e RPAREN
+  | function_name:fn_name LPAREN ident_or_unreserved:i KW_FROM expr:e RPAREN
   {:
     if (fn_name.toString().toLowerCase().endsWith("extract")) {
       RESULT = new ExtractFromExpr(fn_name, i, e);
@@ -4264,13 +4293,13 @@ slot_ref ::=
   ;
 
 dotted_path ::=
-  ident_or_default:ident
+  ident_or_unreserved:ident
   {:
     List<String> list = new ArrayList<>();
     list.add(ident);
     RESULT = list;
   :}
-  | dotted_path:list DOT ident_or_default:ident
+  | dotted_path:list DOT ident_or_unreserved:ident
   {:
     list.add(ident);
     RESULT = list;
@@ -4349,11 +4378,17 @@ struct_field_def_list ::=
   :}
   ;
 
-ident_or_default ::=
+// NOTE: If a new keyword is added here, it should also be added to the
+// isUnreservedKeyword() method.
+ident_or_unreserved ::=
   IDENT:name
   {: RESULT = name.toString(); :}
   | KW_DEFAULT:name
   {: RESULT = name.toString(); :}
+  | KW_KILL:name
+  {: RESULT = name.toString(); :}
+  | KW_QUERY:name
+  {: RESULT = name.toString(); :}
   ;
 
 word ::=
@@ -4587,6 +4622,8 @@ word ::=
   {: RESULT = r.toString(); :}
   | KW_JSONFILE:r
   {: RESULT = r.toString(); :}
+  | KW_KILL:r
+  {: RESULT = r.toString(); :}
   | KW_KUDU:r
   {: RESULT = r.toString(); :}
   | KW_LAST:r
@@ -4675,6 +4712,8 @@ word ::=
   {: RESULT = r.toString(); :}
   | KW_PURGE:r
   {: RESULT = r.toString(); :}
+  | KW_QUERY:r
+  {: RESULT = r.toString(); :}
   | KW_RANGE:r
   {: RESULT = r.toString(); :}
   | KW_RCFILE:r
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java 
b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index 6f28c3a9f..b23dc9a71 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -224,6 +224,10 @@ public class AnalysisContext {
       return stmt_ instanceof ConvertTableToIcebergStmt;
     }
 
+    public boolean isKillQueryStmt() {
+      return stmt_ instanceof KillQueryStmt;
+    }
+
     public AlterTableStmt getAlterTableStmt() {
       Preconditions.checkState(isAlterTableStmt());
       return (AlterTableStmt) stmt_;
@@ -418,6 +422,11 @@ public class AnalysisContext {
       return (ConvertTableToIcebergStmt) stmt_;
     }
 
+    public KillQueryStmt getKillQueryStmt() {
+      Preconditions.checkState(isKillQueryStmt());
+      return (KillQueryStmt) stmt_;
+    }
+
     public StatementBase getStmt() { return stmt_; }
     public Analyzer getAnalyzer() { return analyzer_; }
     public Set<TAccessEvent> getAccessEvents() { return 
analyzer_.getAccessEvents(); }
diff --git a/fe/src/main/java/org/apache/impala/analysis/KillQueryStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/KillQueryStmt.java
new file mode 100644
index 000000000..b344d9ff6
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/KillQueryStmt.java
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
+import org.apache.impala.authorization.AuthorizationConfig;
+import org.apache.impala.authorization.User;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.thrift.TKillQueryReq;
+import org.apache.impala.thrift.TUniqueId;
+import org.apache.impala.util.TUniqueIdUtil;
+
+import hiveexec.com.google.common.base.Preconditions;
+
+public final class KillQueryStmt extends StatementBase {
+  private final String queryIdString_;
+  private TUniqueId queryId_;
+  private User requestingUser_;
+  private boolean requestedByAdmin_ = true;
+
+  public KillQueryStmt(String queryId) {
+    queryIdString_ = queryId;
+  }
+
+  @Override
+  public String toSql(ToSqlOptions options) {
+    return String.format("KILL QUERY '%s'", queryIdString_);
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    super.analyze(analyzer);
+    try {
+      queryId_ = TUniqueIdUtil.ParseId(queryIdString_);
+    } catch (NumberFormatException e) {
+      throw new AnalysisException(
+          String.format("Invalid query id: '%s'", queryIdString_));
+    }
+    requestingUser_ = analyzer.getUser();
+    AuthorizationConfig authzConfig = analyzer.getAuthzConfig();
+    if (authzConfig.isEnabled()) {
+      // Check whether the user is an admin (i.e. user with ALL privilege on 
server).
+      String authzServer = authzConfig.getServerName();
+      Preconditions.checkNotNull(authzServer);
+      analyzer.setMaskPrivChecks(null);
+      analyzer.registerPrivReq(builder -> 
builder.onServer(authzServer).all().build());
+    }
+  }
+
+  public TKillQueryReq toThrift() {
+    return new TKillQueryReq(queryId_, requestingUser_.getName(), 
requestedByAdmin_);
+  }
+
+  @Override
+  public void handleAuthorizationException(AnalysisResult analysisResult) {
+    requestedByAdmin_ = false;
+
+    // By default, a user will not be authorized to access the runtime profile 
if a
+    // masked privilege request fails. This will disallow any non-admin user 
to access
+    // the runtime profile even if the user can kill the query.
+    //
+    // Set to true to allow non-admin users to access the runtime profile.
+    analysisResult.setUserHasProfileAccess(true);
+  }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/analysis/StatementBase.java 
b/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
index 8f45c0e40..7512e2026 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import java.util.Optional;
 import org.apache.commons.lang3.NotImplementedException;
+import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.Type;
@@ -274,4 +275,27 @@ public abstract class StatementBase extends StmtNode {
     }
     return srcExpr.castTo(compatType, compatibilityLevel);
   }
+
+  /**
+   * This method is designed to allow custom authorization exception handling. 
A
+   * statement class that extends StatementBase can customize how it handles an
+   * authorization exception by
+   * 1. Registering a masked privilege request and set the associated error 
message to
+   *    null, so that this custom handler will be called when an authorization 
exception
+   *    is thrown;
+   * 2. Overriding this method to specify how to handle the authorization 
exception for
+   *    this statement class.
+   *
+   * By default, the handler ignores the exception. If a statement class 
registers only
+   * masked authorization exceptions with error message set to null, it might 
need to
+   * override this method.
+   *
+   * NOTE: Currently access to the runtime profile is set to disabled when a 
masked
+   * authorization exception is thrown before this handler gets called (Refer 
to
+   * BaseAuthorizationChecker.authorize() for details).
+   * @param analysisResult
+   */
+  public void handleAuthorizationException(AnalysisResult analysisResult) {
+    // Do nothing
+  }
 }
diff --git 
a/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java
 
b/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java
index d4ea0df53..d43aa76f4 100644
--- 
a/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java
+++ 
b/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java
@@ -177,10 +177,9 @@ public abstract class BaseAuthorizationChecker implements 
AuthorizationChecker {
     }
 
     // Check all masked requests. If a masked request has an associated error 
message,
-    // an AuthorizationException is thrown if authorization fails. Masked 
requests with
-    // no error message are used to check if the user can access the runtime 
profile.
-    // These checks don't result in an AuthorizationException but set the
-    // 'user_has_profile_access' flag in queryCtx_.
+    // an AuthorizationException is thrown if authorization fails. Otherwise, 
the custom
+    // AuthorizationException handler of the statement will be called and the 
user will
+    // not be authorized to access the runtime profile by default if 
authorization fails.
     for (Pair<PrivilegeRequest, String> maskedReq : 
analyzer.getMaskedPrivilegeReqs()) {
       try {
         authzCtx.setRetainAudits(false);
@@ -189,6 +188,8 @@ public abstract class BaseAuthorizationChecker implements 
AuthorizationChecker {
         analysisResult.setUserHasProfileAccess(false);
         if (!Strings.isNullOrEmpty(maskedReq.second)) {
           throw new AuthorizationException(maskedReq.second);
+        } else {
+          
analysisResult.getStmt().handleAuthorizationException(analysisResult);
         }
         break;
       } finally {
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 539d5dedd..4f5146dfc 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -2711,6 +2711,12 @@ public class Frontend {
           createCatalogOpRequest(analysisResult, result);
         }
         return result;
+      } else if (analysisResult.isKillQueryStmt()) {
+        result.stmt_type = TStmtType.KILL;
+        result.setResult_set_metadata(new TResultSetMetadata(
+            Collections.singletonList(new TColumn("result", 
Type.STRING.toThrift()))));
+        
result.setKill_query_request(analysisResult.getKillQueryStmt().toThrift());
+        return result;
       }
 
       // Open or continue Kudu transaction if Kudu transaction is enabled and 
target table
diff --git a/fe/src/main/jflex/sql-scanner.flex 
b/fe/src/main/jflex/sql-scanner.flex
index bfbc3c72f..be7c86811 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -179,6 +179,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
     keywordMap.put("jdbc", SqlParserSymbols.KW_JDBC);
     keywordMap.put("join", SqlParserSymbols.KW_JOIN);
     keywordMap.put("jsonfile", SqlParserSymbols.KW_JSONFILE);
+    keywordMap.put("kill", SqlParserSymbols.KW_KILL);
     keywordMap.put("kudu", SqlParserSymbols.KW_KUDU);
     keywordMap.put("last", SqlParserSymbols.KW_LAST);
     keywordMap.put("leading", SqlParserSymbols.KW_LEADING);
@@ -223,6 +224,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
     keywordMap.put("primary", SqlParserSymbols.KW_PRIMARY);
     keywordMap.put("produced", SqlParserSymbols.KW_PRODUCED);
     keywordMap.put("purge", SqlParserSymbols.KW_PURGE);
+    keywordMap.put("query", SqlParserSymbols.KW_QUERY);
     keywordMap.put("range", SqlParserSymbols.KW_RANGE);
     keywordMap.put("rcfile", SqlParserSymbols.KW_RCFILE);
     keywordMap.put("real", SqlParserSymbols.KW_DOUBLE);
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java 
b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index 80d503308..f86bc96d7 100755
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -3708,8 +3708,9 @@ public class ParserTest extends FrontendTestBase {
         "^\n" +
         "Encountered: IDENTIFIER\n" +
         "Expected: ALTER, COMMENT, COMPUTE, COPY, CREATE, DELETE, DESCRIBE, 
DROP, " +
-        "EXPLAIN, GRANT, INSERT, INVALIDATE, LOAD, MERGE, OPTIMIZE, REFRESH, 
REVOKE, " +
-        "SELECT, SET, SHOW, TRUNCATE, UNSET, UPDATE, UPSERT, USE, VALUES, 
WITH\n");
+        "EXPLAIN, GRANT, INSERT, INVALIDATE, KILL, LOAD, MERGE, OPTIMIZE, 
REFRESH, " +
+        "REVOKE, SELECT, SET, SHOW, TRUNCATE, UNSET, UPDATE, UPSERT, USE, 
VALUES, " +
+        "WITH\n");
 
     // missing select list
     ParserError("select from t",
@@ -3717,7 +3718,7 @@ public class ParserTest extends FrontendTestBase {
         "select from t\n" +
         "       ^\n" +
         "Encountered: FROM\n" +
-        "Expected: ALL, CASE, CAST, DATE, DEFAULT, DISTINCT, EXISTS, FALSE, 
GROUPING, " +
+        "Expected: ALL, CASE, CAST, DATE, DISTINCT, EXISTS, FALSE, GROUPING, " 
+
         "IF, INTERVAL, LEFT, NOT, NULL, REPLACE, RIGHT, STRAIGHT_JOIN, 
TRUNCATE, TRUE, " +
         "UNNEST, IDENTIFIER\n" +
         "\n" +
@@ -3730,7 +3731,7 @@ public class ParserTest extends FrontendTestBase {
         "select c, b, c where a = 5\n" +
         "               ^\n" +
         "Encountered: WHERE\n" +
-        "Expected: AND, AS, BETWEEN, DEFAULT, DIV, EXCEPT, FROM, ILIKE, IN, 
INTERSECT, " +
+        "Expected: AND, AS, BETWEEN, DIV, EXCEPT, FROM, ILIKE, IN, INTERSECT, 
" +
         "IREGEXP, IS, LIKE, LIMIT, ||, MINUS, NOT, OR, ORDER, REGEXP, RLIKE, 
UNION, " +
         "COMMA, IDENTIFIER\n" +
         "\n" +
@@ -3743,7 +3744,7 @@ public class ParserTest extends FrontendTestBase {
         "select c, b, c from where a = 5\n" +
         "                    ^\n" +
         "Encountered: WHERE\n" +
-        "Expected: DEFAULT, UNNEST, IDENTIFIER\n" +
+        "Expected: UNNEST, IDENTIFIER\n" +
         "\n" +
         "Hint: reserved words have to be escaped when used as an identifier, 
e.g. `where`"
         );
@@ -3754,7 +3755,7 @@ public class ParserTest extends FrontendTestBase {
         "select c, b, c from t where\n" +
         "                           ^\n" +
         "Encountered: EOF\n" +
-        "Expected: CASE, CAST, DATE, DEFAULT, EXISTS, FALSE, GROUPING, IF, 
INTERVAL, " +
+        "Expected: CASE, CAST, DATE, EXISTS, FALSE, GROUPING, IF, INTERVAL, " +
         "LEFT, NOT, NULL, REPLACE, RIGHT, STRAIGHT_JOIN, TRUNCATE, TRUE, 
UNNEST, " +
         "IDENTIFIER");
 
@@ -3764,7 +3765,7 @@ public class ParserTest extends FrontendTestBase {
         "select c, b, c from t where group by a, b\n" +
         "                            ^\n" +
         "Encountered: GROUP\n" +
-        "Expected: CASE, CAST, DATE, DEFAULT, EXISTS, FALSE, GROUPING, IF, 
INTERVAL, " +
+        "Expected: CASE, CAST, DATE, EXISTS, FALSE, GROUPING, IF, INTERVAL, " +
         "LEFT, NOT, NULL, REPLACE, RIGHT, STRAIGHT_JOIN, TRUNCATE, TRUE, 
UNNEST, " +
         "IDENTIFIER\n" +
         "\n" +
@@ -3831,7 +3832,7 @@ public class ParserTest extends FrontendTestBase {
         "...c,c,c,c,c,c,c,c,cd,c,d,d, ,c, from t\n" +
         "                             ^\n" +
         "Encountered: COMMA\n" +
-        "Expected: CASE, CAST, DATE, DEFAULT, EXISTS, FALSE, GROUPING, IF, 
INTERVAL, " +
+        "Expected: CASE, CAST, DATE, EXISTS, FALSE, GROUPING, IF, INTERVAL, " +
         "LEFT, NOT, NULL, REPLACE, RIGHT, TRUNCATE, TRUE, UNNEST, IDENTIFIER");
 
     // Parsing identifiers that have different names printed as EXPECTED
@@ -3852,7 +3853,7 @@ public class ParserTest extends FrontendTestBase {
         "USE ` `\n" +
         "    ^\n" +
         "Encountered: EMPTY IDENTIFIER\n" +
-        "Expected: DEFAULT, IDENTIFIER\n");
+        "Expected: IDENTIFIER\n");
 
     // Expecting = token
     ParserError("SET foo",
@@ -3868,7 +3869,7 @@ public class ParserTest extends FrontendTestBase {
          "\n" +
          "^\n" +
          "Encountered: EOF\n" +
-         "Expected: ALL, CASE, CAST, DATE, DEFAULT, DISTINCT, EXISTS, FALSE, 
GROUPING, " +
+         "Expected: ALL, CASE, CAST, DATE, DISTINCT, EXISTS, FALSE, GROUPING, 
" +
          "IF, INTERVAL, LEFT, NOT, NULL, REPLACE, RIGHT, " +
          "STRAIGHT_JOIN, TRUNCATE, TRUE, UNNEST, IDENTIFIER\n");
     ParserError("SELECT\n\n",
@@ -3876,7 +3877,7 @@ public class ParserTest extends FrontendTestBase {
          "\n" +
          "^\n" +
          "Encountered: EOF\n" +
-         "Expected: ALL, CASE, CAST, DATE, DEFAULT, DISTINCT, EXISTS, FALSE, 
GROUPING, " +
+         "Expected: ALL, CASE, CAST, DATE, DISTINCT, EXISTS, FALSE, GROUPING, 
" +
          "IF, INTERVAL, LEFT, NOT, NULL, REPLACE, RIGHT, " +
          "STRAIGHT_JOIN, TRUNCATE, TRUE, UNNEST, IDENTIFIER\n");
   }
@@ -4549,4 +4550,14 @@ public class ParserTest extends FrontendTestBase {
     ParsesOk("--test\nSELECT 1\n");
     ParsesOk("--test\nSELECT 1\n  ");
   }
+
+  @Test
+  public void TestUnreservedKeywords() {
+    // Test if "unreserved keywords" can be used as identifiers, such as table 
names and
+    // column names.
+    final String[] unreservedKeywords = { "DEFAULT", "KILL", "QUERY" };
+    for (String keyword : unreservedKeywords) {
+      ParsesOk(String.format("CREATE TABLE %s (%s INT);", keyword, keyword));
+    }
+  }
 }
diff --git a/tests/common/cluster_config.py b/tests/common/cluster_config.py
new file mode 100644
index 000000000..305d74a8c
--- /dev/null
+++ b/tests/common/cluster_config.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Common cluster configurations as decorators for custom cluster tests
+
+from __future__ import absolute_import, division, print_function
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+
+# Same as in tests/authorization/test_ranger.py
+ADMIN = "admin"
+
+enable_authorization = CustomClusterTestSuite.with_args(
+    # Same as IMPALAD_ARGS and CATALOGD_ARGS in 
tests/authorization/test_ranger.py
+    impalad_args="--server-name=server1 --ranger_service_type=hive "
+                 "--ranger_app_id=impala --authorization_provider=ranger",
+    catalogd_args="--server-name=server1 --ranger_service_type=hive "
+                  "--ranger_app_id=impala --authorization_provider=ranger"
+)
+
+
+def impalad_admission_ctrl_flags(max_requests, max_queued, pool_max_mem,
+                                 proc_mem_limit=None, 
queue_wait_timeout_ms=None,
+                                 admission_control_slots=None, 
executor_groups=None,
+                                 codegen_cache_capacity=0):
+  extra_flags = ""
+  if proc_mem_limit is not None:
+    extra_flags += " -mem_limit={0}".format(proc_mem_limit)
+  if queue_wait_timeout_ms is not None:
+    extra_flags += " -queue_wait_timeout_ms={0}".format(queue_wait_timeout_ms)
+  if admission_control_slots is not None:
+    extra_flags += " 
-admission_control_slots={0}".format(admission_control_slots)
+  if executor_groups is not None:
+    extra_flags += " -executor_groups={0}".format(executor_groups)
+  extra_flags += " -codegen_cache_capacity={0}".format(codegen_cache_capacity)
+
+  return ("-vmodule admission-controller=3 -default_pool_max_requests {0} "
+          "-default_pool_max_queued {1} -default_pool_mem_limit {2} 
{3}".format(
+            max_requests, max_queued, pool_max_mem, extra_flags))
+
+
+admit_one_query_at_a_time = CustomClusterTestSuite.with_args(
+    impalad_args=impalad_admission_ctrl_flags(1, 1, 0)
+)
+admit_no_query = CustomClusterTestSuite.with_args(
+    impalad_args=impalad_admission_ctrl_flags(0, 0, 0)
+)
+single_coordinator = CustomClusterTestSuite.with_args(
+    num_exclusive_coordinators=1
+)
diff --git a/tests/common/custom_cluster_test_suite.py 
b/tests/common/custom_cluster_test_suite.py
index c445d37f4..d0d3275b8 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -107,7 +107,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
   TMP_DIRS = dict()
 
   # Args for cluster startup/teardown when sharing a single cluster for the 
entire class.
-  SHARED_CLUSTER_ARGS = None
+  SHARED_CLUSTER_ARGS = {}
 
   @classmethod
   def get_workload(cls):
@@ -199,14 +199,35 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     if log_symlinks:
       args[LOG_SYMLINKS] = True
 
+    def merge_args(args_first, args_last):
+      result = args_first.copy()
+      for key in args_last:
+        if key not in result:
+          result[key] = args_last[key]
+        else:
+          if key in (
+              IMPALAD_ARGS,
+              STATESTORED_ARGS,
+              CATALOGD_ARGS,
+              START_ARGS,
+              JVM_ARGS,
+              KUDU_ARGS
+          ):
+            # Let the server decide.
+            result[key] = " ".join((result[key], args_last[key]))
+          else:
+            # Last writer wins.
+            result[key] = args_last[key]
+      return result
+
     def decorate(obj):
       """If obj is a class, set SHARED_CLUSTER_ARGS for setup/teardown_class. 
Otherwise
       add to the function __dict__ for setup/teardown_method."""
       if inspect.isclass(obj):
-        obj.SHARED_CLUSTER_ARGS = args
+        obj.SHARED_CLUSTER_ARGS = merge_args(obj.SHARED_CLUSTER_ARGS, args)
       else:
         obj.__dict__[WITH_ARGS_METHOD] = True
-        obj.__dict__.update(args)
+        obj.__dict__ = merge_args(obj.__dict__, args)
       return obj
     return decorate
 
diff --git a/tests/common/impala_connection.py 
b/tests/common/impala_connection.py
index bc81ebb76..262eaaa35 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -89,6 +89,17 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, object)):
   def __exit__(self, exc_type, exc_value, traceback):
     self.close()
 
+  @abc.abstractmethod
+  def get_test_protocol(self):
+    """Return client protocol name that is specific to Impala test framework.
+    Possible return value are either of 'beeswax', 'hs2', or 'hs2-http'."""
+    pass
+
+  @abc.abstractmethod
+  def get_host_port(self):
+    """Return the 'host:port' string of impala server that this object 
connecting to."""
+    pass
+
   @abc.abstractmethod
   def set_configuration_option(self, name, value):
     """Sets a configuration option name to the given value"""
@@ -188,6 +199,12 @@ class BeeswaxConnection(ImpalaConnection):
     self.__host_port = host_port
     self.QUERY_STATES = self.__beeswax_client.query_states
 
+  def get_test_protocol(self):
+    return 'beeswax'
+
+  def get_host_port(self):
+    return self.__host_port
+
   def set_configuration_option(self, name, value):
     # Only set the option if it's not already set to the same value.
     if self.__beeswax_client.get_query_option(name) != value:
@@ -287,6 +304,9 @@ class BeeswaxConnection(ImpalaConnection):
     handle_id = self.handle_id(operation_handle)
     LOG.info("-- {0}: {1}".format(handle_id, message))
 
+  def get_query_id(self, operation_handle):
+    return operation_handle.get_handle().id
+
 
 class ImpylaHS2Connection(ImpalaConnection):
   """Connection to Impala using the impyla client connecting to HS2 endpoint.
@@ -316,6 +336,15 @@ class ImpylaHS2Connection(ImpalaConnection):
     # profile and log from Impala.
     self._collect_profile_and_log = collect_profile_and_log
 
+  def get_test_protocol(self):
+    if self.__http_path:
+      return 'hs2-http'
+    else:
+      return 'hs2'
+
+  def get_host_port(self):
+    return self.__host_port
+
   def set_configuration_option(self, name, value):
     self.__query_options[name] = str(value)
 
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index 71a204b6c..83c6b09ed 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -34,6 +34,7 @@ from time import sleep, time
 from beeswaxd.BeeswaxService import QueryState
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.cluster_config import impalad_admission_ctrl_flags
 from tests.common.custom_cluster_test_suite import (
     ADMISSIOND_ARGS,
     IMPALAD_ARGS,
@@ -143,26 +144,6 @@ INITIAL_QUEUE_REASON_REGEX = \
 RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test", 
"resources")
 
 
-def impalad_admission_ctrl_flags(max_requests, max_queued, pool_max_mem,
-                                 proc_mem_limit=None, 
queue_wait_timeout_ms=None,
-                                 admission_control_slots=None, 
executor_groups=None,
-                                 codegen_cache_capacity=0):
-  extra_flags = ""
-  if proc_mem_limit is not None:
-    extra_flags += " -mem_limit={0}".format(proc_mem_limit)
-  if queue_wait_timeout_ms is not None:
-    extra_flags += " -queue_wait_timeout_ms={0}".format(queue_wait_timeout_ms)
-  if admission_control_slots is not None:
-    extra_flags += " 
-admission_control_slots={0}".format(admission_control_slots)
-  if executor_groups is not None:
-    extra_flags += " -executor_groups={0}".format(executor_groups)
-  extra_flags += " -codegen_cache_capacity={0}".format(codegen_cache_capacity)
-
-  return ("-vmodule admission-controller=3 -default_pool_max_requests {0} "
-          "-default_pool_max_queued {1} -default_pool_mem_limit {2} 
{3}".format(
-            max_requests, max_queued, pool_max_mem, extra_flags))
-
-
 def impalad_admission_ctrl_config_args(fs_allocation_file, llama_site_file,
                                         additional_args="", make_copy=False):
   fs_allocation_path = os.path.join(RESOURCES_DIR, fs_allocation_file)
diff --git a/tests/custom_cluster/test_kill_query.py 
b/tests/custom_cluster/test_kill_query.py
new file mode 100644
index 000000000..1e035a75a
--- /dev/null
+++ b/tests/custom_cluster/test_kill_query.py
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import, division, print_function
+
+import pytest
+
+import tests.common.cluster_config as cluster_config
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.test_result_verifier import error_msg_expected
+from tests.util.cancel_util import (
+    QueryToKill,
+    assert_kill_error,
+    assert_kill_ok
+)
+
+
+class TestKillQuery(CustomClusterTestSuite):
+  @pytest.mark.execute_serially
+  def test_coordinator_unreachable(self):
+    """
+    The coordinator of the query to kill is unreachable.
+
+    It is required that each impalad in the cluster is a coordinator.
+    """
+    protocol = 'hs2'
+    with self.create_client_for_nth_impalad(0, protocol) as client, \
+        QueryToKill(
+            self,
+            protocol,
+            check_on_exit=False,
+            nth_impalad=2) as query_id_to_kill:
+      coordinator_to_kill = self.cluster.impalads[2]
+      coordinator_to_kill.kill()
+      assert_kill_error(
+          client,
+          "KillQuery() RPC failed: Network error:",
+          query_id=query_id_to_kill,
+      )
+
+  @pytest.mark.execute_serially
+  def test_another_coordinator_unreachable(self):
+    """
+    A coordinator other than the one of the query to kill is unreachable.
+
+    It is required that each impalad in the cluster is a coordinator.
+    """
+    protocol = 'hs2'
+    with self.create_client_for_nth_impalad(0, protocol) as client, \
+        QueryToKill(self, protocol, nth_impalad=2) as query_id_to_kill:
+      coordinator_to_kill = self.cluster.impalads[1]  # impalad 1 is between 0 
and 2.
+      coordinator_to_kill.kill()
+      assert_kill_ok(client, query_id_to_kill)
+
+  @pytest.mark.execute_serially
+  @cluster_config.single_coordinator
+  def test_single_coordinator(self):
+    """
+    Test when there is only one coordinator in the cluster.
+    """
+    protocol = 'hs2'
+    with self.create_client_for_nth_impalad(0, protocol) as client:
+      assert_kill_error(
+          client,
+          "Could not find query on any coordinator.",
+          query_id='123:456')
+
+  @pytest.mark.execute_serially
+  @cluster_config.admit_one_query_at_a_time
+  def test_admit_one_query_at_a_time(self):
+    """
+    Make sure queries can be killed when only one query is allowed to run at a 
time.
+    """
+    protocol = 'hs2'
+    with self.create_client_for_nth_impalad(0, protocol) as client, \
+        QueryToKill(self, protocol) as query_id_to_kill:
+      assert_kill_ok(client, query_id_to_kill)
+
+  @pytest.mark.execute_serially
+  @cluster_config.admit_no_query
+  def test_admit_no_query(self):
+    """
+    Make sure KILL QUERY statement can be executed when no query will be 
admitted.
+
+    This is to show that KILL QUERY statements are not subject to admission 
control.
+    """
+    protocol = 'hs2'
+    with self.create_client_for_nth_impalad(0, protocol) as client:
+      try:
+        client.execute("SELECT 1")
+      except Exception as e:
+        expected_msg = (
+            "Rejected query from pool default-pool: "
+            "disabled by requests limit set to 0"
+        )
+        assert error_msg_expected(str(e), expected_msg)
+      assert_kill_error(
+          client,
+          "Could not find query on any coordinator",
+          query_id="123:456"
+      )
+
+
+@cluster_config.enable_authorization
+class TestKillQueryAuthorization(CustomClusterTestSuite):
+  @pytest.mark.execute_serially
+  def test_kill_as_admin(self):
+    # ImpylaHS2Connection does not support authentication yet.
+    protocol = 'beeswax'
+    with self.create_client_for_nth_impalad(0, protocol) as client, \
+        QueryToKill(self, protocol, user="user1") as query_id_to_kill:
+      assert_kill_ok(client, query_id_to_kill, user=cluster_config.ADMIN)
+
+  @pytest.mark.execute_serially
+  def test_kill_as_non_admin(self):
+    # ImpylaHS2Connection does not support authentication yet.
+    protocol = 'beeswax'
+    user1, user2 = "user1", "user2"
+    with self.create_client_for_nth_impalad(0, protocol) as user1_client, \
+        self.create_client_for_nth_impalad(0, protocol) as user2_client, \
+        QueryToKill(self, protocol, user=user1) as query_id_to_kill:
+      assert_kill_error(
+          user2_client,
+          "User '{0}' is not authorized to kill the query.".format(user2),
+          query_id=query_id_to_kill,
+          user=user2,
+      )
+      assert_kill_ok(user1_client, query_id_to_kill, user=user1)
diff --git a/tests/query_test/test_cancellation.py 
b/tests/query_test/test_cancellation.py
index 21fc59497..342a39f03 100644
--- a/tests/query_test/test_cancellation.py
+++ b/tests/query_test/test_cancellation.py
@@ -82,6 +82,11 @@ SORT_BUFFER_POOL_LIMIT = ['0', '300m'] # Test spilling and 
non-spilling sorts.
 # Test with and without multithreading
 MT_DOP_VALUES = [0, 4]
 
+# Use KILL QUERY statement or not.
+# False: Send the Thrift RPCs directly.
+# True: Execute a KILL QUERY statement.
+USE_KILL_QUERY_STATEMENT = [False, True]
+
 class TestCancellation(ImpalaTestSuite):
   @classmethod
   def get_workload(self):
@@ -108,6 +113,8 @@ class TestCancellation(ImpalaTestSuite):
         ImpalaTestDimension('cpu_limit_s', *CPU_LIMIT_S))
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension('mt_dop', *MT_DOP_VALUES))
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('use_kill_query_statement', 
*USE_KILL_QUERY_STATEMENT))
 
     cls.ImpalaTestMatrix.add_constraint(
         lambda v: v.get_value('query_type') != 'CTAS' or (\
@@ -119,6 +126,11 @@ class TestCancellation(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_constraint(
         lambda v: not (v.get_value('query_type') == 'CTAS' and
             v.get_value('query').startswith('compute stats')))
+    # 'use_kill_query_statement' and 'join_before_close' cannot be both True, 
since
+    # the KILL QUERY statement will also close the query.
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: not (v.get_value('use_kill_query_statement')
+            and v.get_value('join_before_close')))
 
     # Ignore CTAS on Kudu if there is no PRIMARY KEY specified.
     cls.ImpalaTestMatrix.add_constraint(
@@ -167,7 +179,8 @@ class TestCancellation(ImpalaTestSuite):
     for i in range(NUM_CANCELATION_ITERATIONS):
       cancel_query_and_validate_state(self.client, query,
           vector.get_value('exec_option'), vector.get_value('table_format'),
-          vector.get_value('cancel_delay'), 
vector.get_value('join_before_close'))
+          vector.get_value('cancel_delay'), 
vector.get_value('join_before_close'),
+          
use_kill_query_statement=vector.get_value('use_kill_query_statement'))
 
       if query_type == "CTAS":
         self.cleanup_test_table(vector.get_value('table_format'))
diff --git a/tests/query_test/test_kill_query.py 
b/tests/query_test/test_kill_query.py
new file mode 100644
index 000000000..c456c3fe5
--- /dev/null
+++ b/tests/query_test/test_kill_query.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import, division, print_function
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_dimensions import create_client_protocol_dimension
+from tests.util.cancel_util import (
+    QueryToKill,
+    assert_kill_error,
+    assert_kill_ok
+)
+
+
+class TestKillQuery(ImpalaTestSuite):
+  @classmethod
+  def add_test_dimensions(cls):
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
+
+  def test_same_coordinator(self, vector):
+    protocol = vector.get_value('protocol')
+    with self.create_client_for_nth_impalad(0, protocol) as client, \
+        QueryToKill(self, protocol) as query_id_to_kill:
+      assert_kill_ok(client, query_id_to_kill)
+
+  def test_different_coordinator(self, vector):
+    protocol = vector.get_value('protocol')
+    with self.create_client_for_nth_impalad(1, protocol) as client, \
+        QueryToKill(self, protocol, nth_impalad=0) as query_id_to_kill:
+      assert_kill_ok(client, query_id_to_kill)
+
+  def test_invalid_query_id(self, vector):
+    with self.create_impala_client(protocol=vector.get_value('protocol')) as 
client:
+      assert_kill_error(
+          client,
+          "ParseException: Syntax error",
+          sql="KILL QUERY 123:456",
+      )
+      assert_kill_error(
+          client,
+          "AnalysisException: Invalid query id: ''",
+          sql="KILL QUERY ''",
+      )
+      assert_kill_error(
+          client,
+          "AnalysisException: Invalid query id: 'f:g'",
+          sql="KILL QUERY 'f:g'",
+      )
+      assert_kill_error(
+          client,
+          "AnalysisException: Invalid query id: '123'",
+          sql="KILL QUERY '123'",
+      )
+
+  def test_idempotence(self, vector):
+    protocol = vector.get_value('protocol')
+    with self.create_impala_client(protocol=protocol) as client, \
+        QueryToKill(self, protocol) as query_id_to_kill:
+      assert_kill_ok(client, query_id_to_kill)
+      assert_kill_error(
+          client,
+          "Could not find query on any coordinator",
+          query_id=query_id_to_kill,
+      )
+      assert_kill_error(
+          client,
+          "Could not find query on any coordinator",
+          query_id=query_id_to_kill,
+      )
diff --git a/tests/util/cancel_util.py b/tests/util/cancel_util.py
index d2f011824..5957ce757 100644
--- a/tests/util/cancel_util.py
+++ b/tests/util/cancel_util.py
@@ -19,18 +19,91 @@ from __future__ import absolute_import, division, 
print_function
 import threading
 from time import sleep
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.impala_connection import create_connection
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_result_verifier import error_msg_expected
+
+
+class QueryToKill:
+  def __init__(self, test_suite, protocol, check_on_exit=True, user=None, 
nth_impalad=0):
+    self.client = test_suite.create_client_for_nth_impalad(nth_impalad, 
protocol)
+    self.sql = 'SELECT sleep(1000)'
+    self.check_on_exit = check_on_exit
+    self.user = user
+
+  def poll(self):
+    while True:
+      try:
+        results = self.client.fetch(self.sql, self.handle)
+        if len(results.data) > 0:
+          raise Exception("Failed to kill query within time limit.")
+      except Exception as e:
+        self.exc = e
+        return
+
+  def __enter__(self):
+    self.handle = self.client.execute_async(self.sql, user=self.user)
+    self.poll_thread = threading.Thread(target=lambda: self.poll())
+    self.poll_thread.start()
+    return self.client.get_query_id(self.handle)
+
+  def __exit__(self, exc_type, exc_value, traceback):  # noqa: U100
+    self.poll_thread.join()
+    if not self.check_on_exit:
+      self.client.close()
+      return
+    # If ImpalaServer::UnregisterQuery() happens before the last polling, the 
error
+    # message will be "Invalid or unknown query handle". Otherwise, the error 
message
+    # will be "Cancelled".
+    assert error_msg_expected(
+        str(self.exc),
+        "Invalid or unknown query handle",
+        self.client.get_query_id(self.handle),
+    ) or error_msg_expected(
+        str(self.exc),
+        "Cancelled",
+        self.client.get_query_id(self.handle),
+    )
+    try:
+      self.client.fetch(self.sql, self.handle)
+    except Exception as ex:
+      assert "Invalid or unknown query handle" in str(ex)
+    finally:
+      self.client.close()
+
+
+def assert_kill_ok(client, query_id, user=None):
+  sql = "KILL QUERY '{0}'".format(query_id)
+  result = client.execute(sql, user=user)
+  assert result.success and len(result.data) == 1
+  assert result.data[0] == "Query {0} is killed.".format(query_id)
+
+
+def assert_kill_error(client, error_msg, query_id=None, sql=None, user=None):
+  if sql is None:
+    sql = "KILL QUERY '{0}'".format(query_id)
+  try:
+    client.execute(sql, user=user)
+    assert False, "Failed to catch the exception."
+  except Exception as exc:
+    assert error_msg_expected(str(exc), error_msg)
 
 
 def cancel_query_and_validate_state(client, query, exec_option, table_format,
-    cancel_delay, join_before_close=False):
+    cancel_delay, join_before_close=False, use_kill_query_statement=False):
   """Runs the given query asynchronously and then cancels it after the 
specified delay.
   The query is run with the given 'exec_options' against the specified 
'table_format'. A
   separate async thread is launched to fetch the results of the query. The 
method
   validates that the query was successfully cancelled and that the error 
messages for the
   calls to ImpalaConnection#fetch and #close are consistent. If 
'join_before_close' is
   True the method will join against the fetch results thread before closing 
the query.
+
+  If 'use_kill_query_statement' is True and 'join_before_close' is False, a 
KILL QUERY
+  statement will be executed to cancel and close the query, instead of sending 
the Thrift
+  RPCs directly.
   """
+  assert not (join_before_close and use_kill_query_statement)
+
   if table_format: ImpalaTestSuite.change_database(client, table_format)
   if exec_option: client.set_configuration(exec_option)
   handle = client.execute_async(query)
@@ -51,18 +124,30 @@ def cancel_query_and_validate_state(client, query, 
exec_option, table_format,
           if "Query Status:" in line:
               error_msg += line
       assert False, error_msg
-  cancel_result = client.cancel(handle)
-  assert cancel_result.status_code == 0,\
-      'Unexpected status code from cancel request: %s' % cancel_result
+  if use_kill_query_statement:
+    with create_connection(
+        host_port=client.get_host_port(),
+        protocol=client.get_test_protocol(),
+    ) as kill_client:
+      kill_client.connect()
+      if exec_option:
+        kill_client.set_configuration(exec_option)
+      assert_kill_ok(kill_client, client.get_query_id(handle))
+  else:
+    cancel_result = client.cancel(handle)
+    assert cancel_result.status_code == 0, \
+        'Unexpected status code from cancel request: %s' % cancel_result
 
   if join_before_close:
     thread.join()
 
   close_error = None
-  try:
-    client.close_query(handle)
-  except ImpalaBeeswaxException as e:
-    close_error = e
+  # The KILL QUERY statement will also close the query.
+  if not use_kill_query_statement:
+    try:
+      client.close_query(handle)
+    except ImpalaBeeswaxException as e:
+      close_error = e
 
   # Before accessing fetch_results_error we need to join the fetch thread
   thread.join()


Reply via email to