This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new d7ee509e9 IMPALA-12648: Add KILL QUERY statement
d7ee509e9 is described below
commit d7ee509e937de0314e423326f8aeeffe0a49b6fb
Author: Xuebin Su <[email protected]>
AuthorDate: Tue Oct 15 11:04:45 2024 +0800
IMPALA-12648: Add KILL QUERY statement
To support killing queries programatically, this patch adds a new
type of SQL statements, called the KILL QUERY statement, to cancel and
unregister a query on any coordinator in the cluster.
A KILL QUERY statement looks like
```
KILL QUERY '123:456';
```
where `123:456` is the query id of the query we want to kill. We follow
syntax from HIVE-17483. For backward compatibility, 'KILL' and 'QUERY'
are added as "unreserved keywords", like 'DEFAULT'. This allows the
three keywords to be used as identifiers.
A user is authorized to kill a query only if the user is an admin or is
the owner of the query. KILL QUERY statements are not affected by
admission control.
Implementation:
Since we don't know in advance which impalad is the coordinator of the
query we want to kill, we need to broadcast the kill request to all the
coordinators in the cluster. Upon receiving a kill request, each
coordinator checks whether it is the coordinator of the query:
- If yes, it cancels and unregisters the query,
- If no, it reports "Invalid or unknown query handle".
Currently, a KILL QUERY statement is not interruptible. IMPALA-13663 is
created for this.
For authorization, this patch adds a custom handler of
AuthorizationException for each statement to allow the exception to be
handled by the backend. This is because we don't know whether the user
is the owner of the query until we reach its coordinator.
To support cancelling child queries, this patch changes
ChildQuery::Cancel() to bypass the HS2 layer so that the session of the
child query will not be added to the connection used to execute the
KILL QUERY statement.
Testing:
- A new ParserTest case is added to test using "unreserved keywords" as
identifiers.
- New E2E test cases are added for the KILL QUERY statement.
- Added a new dimension in TestCancellation to use the KILL QUERY
statement.
- Added file tests/common/cluster_config.py and made
CustomClusterTestSuite.with_args() composable so that common cluster
configs can be reused in custom cluster tests.
Change-Id: If12d6e47b256b034ec444f17c7890aa3b40481c0
Reviewed-on: http://gerrit.cloudera.org:8080/21930
Reviewed-by: Riza Suminto <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Michael Smith <[email protected]>
---
be/src/service/child-query.cc | 32 +--
be/src/service/client-request-state.cc | 115 ++++++++++
be/src/service/client-request-state.h | 12 ++
be/src/service/control-service.cc | 21 ++
be/src/service/control-service.h | 9 +
be/src/service/impala-server.cc | 25 +++
be/src/service/impala-server.h | 6 +
common/protobuf/control_service.proto | 18 ++
common/thrift/Frontend.thrift | 14 ++
common/thrift/Types.thrift | 1 +
fe/src/main/cup/sql-parser.cup | 233 ++++++++++++---------
.../apache/impala/analysis/AnalysisContext.java | 9 +
.../org/apache/impala/analysis/KillQueryStmt.java | 80 +++++++
.../org/apache/impala/analysis/StatementBase.java | 24 +++
.../authorization/BaseAuthorizationChecker.java | 9 +-
.../java/org/apache/impala/service/Frontend.java | 6 +
fe/src/main/jflex/sql-scanner.flex | 2 +
.../org/apache/impala/analysis/ParserTest.java | 33 ++-
tests/common/cluster_config.py | 65 ++++++
tests/common/custom_cluster_test_suite.py | 27 ++-
tests/common/impala_connection.py | 29 +++
tests/custom_cluster/test_admission_controller.py | 21 +-
tests/custom_cluster/test_kill_query.py | 142 +++++++++++++
tests/query_test/test_cancellation.py | 15 +-
tests/query_test/test_kill_query.py | 83 ++++++++
tests/util/cancel_util.py | 101 ++++++++-
26 files changed, 973 insertions(+), 159 deletions(-)
diff --git a/be/src/service/child-query.cc b/be/src/service/child-query.cc
index 78c94a01e..d2d783f59 100644
--- a/be/src/service/child-query.cc
+++ b/be/src/service/child-query.cc
@@ -149,26 +149,28 @@ void ChildQuery::Cancel() {
if (!is_running_) return;
is_running_ = false;
}
- TUniqueId session_id;
+ TUniqueId query_id;
TUniqueId secret_unused;
// Ignore return statuses because they are not actionable.
- Status status =
ImpalaServer::THandleIdentifierToTUniqueId(hs2_handle_.operationId,
- &session_id, &secret_unused);
+ Status status = ImpalaServer::THandleIdentifierToTUniqueId(
+ hs2_handle_.operationId, &query_id, &secret_unused);
if (status.ok()) {
- VLOG_QUERY << "Cancelling and closing child query with operation id: " <<
- PrintId(session_id);
+ VLOG_QUERY << "Cancelling and closing child query with operation id: "
+ << PrintId(query_id);
} else {
- VLOG_QUERY << "Cancelling and closing child query. Failed to get query id:
" <<
- status;
+ VLOG_QUERY << "Cancelling and closing child query. Failed to get query id:
"
+ << status;
+ }
+ // Bypass the HS2 layer so that the session of the child query will not be
added to
+ // the connection of the caller of ChildQuery::Cancel().
+ status = parent_server_->CancelInternal(query_id);
+ if (!status.ok()) {
+ LOG(ERROR) << "Failed to cancel child query: " << status.GetDetail();
+ }
+ status = parent_server_->UnregisterQuery(query_id, true, &Status::CANCELLED);
+ if (!status.ok()) {
+ LOG(ERROR) << "Failed to unregister child query: " << status.GetDetail();
}
- TCancelOperationResp cancel_resp;
- TCancelOperationReq cancel_req;
- cancel_req.operationHandle = hs2_handle_;
- parent_server_->CancelOperation(cancel_resp, cancel_req);
- TCloseOperationResp close_resp;
- TCloseOperationReq close_req;
- close_req.operationHandle = hs2_handle_;
- parent_server_->CloseOperation(close_resp, close_req);
}
Status ChildQuery::IsCancelled() {
diff --git a/be/src/service/client-request-state.cc
b/be/src/service/client-request-state.cc
index eedef67a7..08999a55c 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -359,6 +359,10 @@ Status ClientRequestState::Exec() {
case TStmtType::UNKNOWN:
DCHECK(false);
return Status("Exec request uninitialized during execution");
+ case TStmtType::KILL:
+ DCHECK(exec_req.__isset.kill_query_request);
+ LOG_AND_RETURN_IF_ERROR(ExecKillQueryRequest());
+ break;
default:
return Status(Substitute("Unknown exec request stmt type: $0",
exec_req.stmt_type));
}
@@ -2460,6 +2464,117 @@ void ClientRequestState::ExecMigrateRequestImpl() {
}
}
+Status ClientRequestState::TryKillQueryLocally(
+ const TUniqueId& query_id, const string& requesting_user, bool is_admin) {
+ Status status = ExecEnv::GetInstance()->impala_server()->KillQuery(
+ query_id, requesting_user, is_admin);
+ if (status.ok()) {
+ SetResultSet({Substitute("Query $0 is killed.", PrintId(query_id))});
+ return query_status_;
+ }
+ return status;
+}
+
+Status ClientRequestState::TryKillQueryRemotely(
+ const TUniqueId& query_id, const KillQueryRequestPB& request) {
+ // The initial status should be INVALID_QUERY_HANDLE so that if there is no
other
+ // coordinator in the cluster, it will be the status to return.
+ Status status = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE,
PrintId(query_id));
+ ExecutorGroup all_coordinators =
+
ExecEnv::GetInstance()->cluster_membership_mgr()->GetSnapshot()->GetCoordinators();
+ // Skipping the current impalad.
+ unique_ptr<ExecutorGroup>
other_coordinators{ExecutorGroup::GetFilteredExecutorGroup(
+ &all_coordinators, {ExecEnv::GetInstance()->krpc_address()})};
+ // If we get an RPC error, instead of returning immediately, we record it
and move
+ // on to the next coordinator.
+ Status rpc_errors = Status::OK();
+ for (const auto& backend : other_coordinators->GetAllExecutorDescriptors()) {
+ // The logic here is similar to ExecShutdownRequest()
+ NetworkAddressPB krpc_addr = MakeNetworkAddressPB(backend.ip_address(),
+ backend.address().port(), backend.backend_id(),
+ ExecEnv::GetInstance()->rpc_mgr()->GetUdsAddressUniqueId());
+ VLOG_QUERY << "Sending KillQuery() RPC to " <<
NetworkAddressPBToString(krpc_addr);
+ unique_ptr<ControlServiceProxy> proxy;
+ Status get_proxy_status =
+ ControlService::GetProxy(krpc_addr, backend.address().hostname(),
&proxy);
+ if (!get_proxy_status.ok()) {
+ Status get_proxy_status_to_report{Substitute(
+ "KillQuery: Could not get Proxy to ControlService at $0 with error:
$1.",
+ NetworkAddressPBToString(krpc_addr), get_proxy_status.msg().msg())};
+ rpc_errors.MergeStatus(get_proxy_status_to_report);
+ LOG(ERROR) << get_proxy_status_to_report.GetDetail();
+ continue;
+ }
+ KillQueryResponsePB response;
+ const int num_retries = 3;
+ const int64_t timeout_ms = 10 * MILLIS_PER_SEC;
+ const int64_t backoff_time_ms = 3 * MILLIS_PER_SEC;
+ // Currently, a KILL QUERY statement is not interruptible.
+ Status rpc_status = RpcMgr::DoRpcWithRetry(proxy,
&ControlServiceProxy::KillQuery,
+ request, &response, query_ctx_, "KillQuery() RPC failed", num_retries,
timeout_ms,
+ backoff_time_ms, "CRS_KILL_QUERY_RPC");
+ if (!rpc_status.ok()) {
+ LOG(ERROR) << rpc_status.GetDetail();
+ rpc_errors.MergeStatus(rpc_status);
+ continue;
+ }
+ // Currently, we only support killing one query in one KILL QUERY
statement.
+ DCHECK_EQ(response.statuses_size(), 1);
+ status = Status(response.statuses(0));
+ if (status.ok()) {
+ // Kill succeeded.
+ VLOG_QUERY << "KillQuery: Found the coordinator at "
+ << NetworkAddressPBToString(krpc_addr);
+ SetResultSet({Substitute("Query $0 is killed.", PrintId(query_id))});
+ return query_status_;
+ } else if (status.code() != TErrorCode::INVALID_QUERY_HANDLE) {
+ LOG(ERROR) << "KillQuery: Found the coordinator at "
+ << NetworkAddressPBToString(krpc_addr)
+ << " but failed to kill the query: "
+ << status.GetDetail();
+ // Kill failed, but we found the coordinator of the query.
+ return status;
+ }
+ }
+ // We did't find the coordinator of the query after trying all other
coordinators.
+ // If there is any RPC error, return it.
+ if (!rpc_errors.ok()) {
+ return rpc_errors;
+ }
+ // If there is no RPC error, return INVALID_QUERY_HANDLE.
+ return status;
+}
+
+Status ClientRequestState::ExecKillQueryRequest() {
+ TUniqueId query_id = exec_request().kill_query_request.query_id;
+ string requesting_user = exec_request().kill_query_request.requesting_user;
+ bool is_admin = exec_request().kill_query_request.is_admin;
+
+ VLOG_QUERY << "Exec KillQuery: query_id=" << PrintId(query_id)
+ << ", requesting_user=" << requesting_user << ", is_admin=" <<
is_admin;
+
+ // First try cancelling the query locally.
+ Status status = TryKillQueryLocally(query_id, requesting_user, is_admin);
+ if (status.code() != TErrorCode::INVALID_QUERY_HANDLE) {
+ return status;
+ }
+
+ // The current impalad is NOT the coordinator of the query. Now we have to
broadcast
+ // the kill request to all other coordinators.
+ UniqueIdPB query_id_pb;
+ TUniqueIdToUniqueIdPB(query_id, &query_id_pb);
+ KillQueryRequestPB request;
+ *request.add_query_ids() = query_id_pb;
+ *request.mutable_requesting_user() = requesting_user;
+ request.set_is_admin(is_admin);
+ status = TryKillQueryRemotely(query_id, request);
+ if (status.code() != TErrorCode::INVALID_QUERY_HANDLE) {
+ return status;
+ }
+ // All the error messages are "Invalid or unknown query handle".
+ return Status("Could not find query on any coordinator.");
+}
+
void ClientRequestState::AddTableResetHints(const TConvertTableRequest& params,
Status* status) const {
string table_reset_hint("Your table might have been renamed. To reset the
name "
diff --git a/be/src/service/client-request-state.h
b/be/src/service/client-request-state.h
index e91f041c5..4c6f63c55 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -978,6 +978,9 @@ class ClientRequestState {
/// Core logic of executing a MIGRATE TABLE statement.
void ExecMigrateRequestImpl();
+ /// The logic of executing a KILL QUERY statement.
+ Status ExecKillQueryRequest();
+
/// Used when running into an error during table migration to extend
'status' with some
/// hints about how to reset the original table name. 'params' holds the SQL
query
/// string the user should run.
@@ -985,5 +988,14 @@ class ClientRequestState {
/// Adds the catalog execution timeline returned from catalog RPCs into the
profile.
void AddCatalogTimeline();
+
+ /// Try killing query with the given query_id locally as the requesting_user.
+ /// 'is_admin' is true if requesting_user is an admin.
+ Status TryKillQueryLocally(
+ const TUniqueId& query_id, const std::string& requesting_user, bool
is_admin);
+
+ /// Try to ask other coordinators to kill query by sending the request.
+ Status TryKillQueryRemotely(
+ const TUniqueId& query_id, const KillQueryRequestPB& request);
};
}
diff --git a/be/src/service/control-service.cc
b/be/src/service/control-service.cc
index d20922458..ed52f28f7 100644
--- a/be/src/service/control-service.cc
+++ b/be/src/service/control-service.cc
@@ -221,6 +221,17 @@ void ControlService::RespondAndReleaseRpc(
rpc_context->RespondSuccess();
}
+template <typename ResponsePBType>
+void ControlService::RespondAndReleaseRpc(
+ const vector<Status>& statuses, ResponsePBType* response, RpcContext*
rpc_context) {
+ for (int i = 0; i < statuses.size(); ++i) {
+ statuses[i].ToProto(response->add_statuses());
+ }
+ // Release the memory against the control service's memory tracker.
+ mem_tracker_->Release(rpc_context->GetTransferSize());
+ rpc_context->RespondSuccess();
+}
+
void ControlService::CancelQueryFInstances(const
CancelQueryFInstancesRequestPB* request,
CancelQueryFInstancesResponsePB* response, RpcContext* rpc_context) {
DCHECK(request->has_query_id());
@@ -249,4 +260,14 @@ void ControlService::RemoteShutdown(const
RemoteShutdownParamsPB* req,
RespondAndReleaseRpc(status, response, rpc_context);
}
+
+void ControlService::KillQuery(const KillQueryRequestPB* request,
+ KillQueryResponsePB* response, RpcContext* rpc_context) {
+ // Currently, we only support killing one query in one KILL QUERY statement.
+ DCHECK_EQ(request->query_ids_size(), 1);
+ const TUniqueId& query_id = ProtoToQueryId(request->query_ids(0));
+ Status status = ExecEnv::GetInstance()->impala_server()->KillQuery(
+ query_id, request->requesting_user(), request->is_admin());
+ RespondAndReleaseRpc(vector{status}, response, rpc_context);
+}
}
diff --git a/be/src/service/control-service.h b/be/src/service/control-service.h
index 2411fff6f..a38f22679 100644
--- a/be/src/service/control-service.h
+++ b/be/src/service/control-service.h
@@ -76,6 +76,10 @@ class ControlService : public ControlServiceIf {
virtual void RemoteShutdown(const RemoteShutdownParamsPB* req,
RemoteShutdownResultPB* response, ::kudu::rpc::RpcContext* context)
override;
+ /// Kill a query. The query will be unregistered.
+ virtual void KillQuery(const KillQueryRequestPB* request,
+ KillQueryResponsePB* response, ::kudu::rpc::RpcContext* rpc_context)
override;
+
/// Gets a ControlService proxy to a server with 'address' and 'hostname'.
/// The newly created proxy is returned in 'proxy'. Returns error status on
failure.
static Status GetProxy(const NetworkAddressPB& address, const std::string&
hostname,
@@ -97,6 +101,11 @@ class ControlService : public ControlServiceIf {
template <typename ResponsePBType>
void RespondAndReleaseRpc(
const Status& status, ResponsePBType* response, kudu::rpc::RpcContext*
rpc_context);
+
+ /// For RPCs whose reponses have more than one Status.
+ template <typename ResponsePBType>
+ void RespondAndReleaseRpc(const vector<Status>& statuses, ResponsePBType*
response,
+ kudu::rpc::RpcContext* rpc_context);
};
} // namespace impala
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 75d92f495..15d43ebe5 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1821,6 +1821,31 @@ Status ImpalaServer::CancelInternal(const TUniqueId&
query_id) {
return Status::OK();
}
+Status ImpalaServer::KillQuery(
+ const TUniqueId& query_id, const string& requesting_user, bool is_admin) {
+ VLOG_QUERY << "KillQuery(): query_id=" << PrintId(query_id)
+ << ", requesting_user=" << requesting_user << ", is_admin=" <<
is_admin;
+ QueryHandle query_handle;
+ RETURN_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle));
+ if (!is_admin && requesting_user != query_handle->effective_user()) {
+ return Status(
+ Substitute("User '$0' is not authorized to kill the query.",
requesting_user));
+ }
+ Status status = CancelInternal(query_id);
+ if (status.code() != TErrorCode::INVALID_QUERY_HANDLE) {
+ // The current impalad is the coordinator of the query.
+ RETURN_IF_ERROR(status);
+ status = UnregisterQuery(query_id, true, &Status::CANCELLED);
+ if (status.ok() || status.code() == TErrorCode::INVALID_QUERY_HANDLE) {
+ // There might be another thread that has already unregistered the query
+ // before UnregisterQuery() and after CancelInternal(). In this case we
are done.
+ return Status::OK();
+ }
+ return status;
+ }
+ return status;
+}
+
Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
const SecretArg& secret, bool ignore_if_absent) {
DCHECK(secret.is_session_secret());
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 695587e1b..2af5cfc0e 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -722,6 +722,12 @@ class ImpalaServer : public ImpalaServiceIf,
static int64_t DecrementCount(
std::map<std::string, int64_t>& loads, const std::string& key);
+ /// Try cancelling and unregistering the query with the given query_id as the
+ /// requesting_user. 'is_admin' is true if requesting_user is an admin.
+ /// This method is used in executing a KILL QUERY statement.
+ Status KillQuery(
+ const TUniqueId& query_id, const std::string& requesting_user, bool
is_admin);
+
private:
struct ExpirationEvent;
class SecretArg;
diff --git a/common/protobuf/control_service.proto
b/common/protobuf/control_service.proto
index 02933b6f2..b91ebbc87 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -464,6 +464,21 @@ message ExecQueryFInstancesResponsePB {
optional StatusPB status = 1;
}
+message KillQueryRequestPB {
+ // The query ids of the queries to kill.
+ repeated UniqueIdPB query_ids = 1;
+
+ // The effective user who submitted this request.
+ required string requesting_user = 2;
+
+ // True if the requesting_user is an admin.
+ required bool is_admin = 3;
+}
+
+message KillQueryResponsePB {
+ repeated StatusPB statuses = 1;
+}
+
service ControlService {
// Override the default authorization method.
option (kudu.rpc.default_authz_method) = "Authorize";
@@ -485,4 +500,7 @@ service ControlService {
// Called to initiate shutdown of this backend.
rpc RemoteShutdown(RemoteShutdownParamsPB) returns (RemoteShutdownResultPB);
+
+ // Called by an impalad to ask another impalad to kill a query.
+ rpc KillQuery(KillQueryRequestPB) returns (KillQueryResponsePB);
}
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 400e4e11d..dd7ac335f 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -635,6 +635,17 @@ struct TConvertTableRequest {
12: optional string drop_temporary_hdfs_table_query
}
+// Request for a KILL QUERY statement.
+struct TKillQueryReq {
+ 1: required Types.TUniqueId query_id
+
+ // The effective user who submitted this request.
+ 2: required string requesting_user;
+
+ // True if the requesting_user is an admin.
+ 3: required bool is_admin;
+}
+
// Result of call to createExecRequest()
struct TExecRequest {
1: required Types.TStmtType stmt_type = TStmtType.UNKNOWN
@@ -716,6 +727,9 @@ struct TExecRequest {
// Columns referenced in an order by clause.
25: optional list<string> orderby_columns
+
+ // Request for "KILL QUERY" statements.
+ 26: optional TKillQueryReq kill_query_request
}
// Parameters to FeSupport.cacheJar().
diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift
index 4e13a417f..0c951ee73 100644
--- a/common/thrift/Types.thrift
+++ b/common/thrift/Types.thrift
@@ -111,6 +111,7 @@ enum TStmtType {
TESTCASE = 7
CONVERT = 8
UNKNOWN = 9
+ KILL = 10
}
enum TIcebergOperation {
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 28785fb64..553a1a599 100755
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -87,9 +87,24 @@ parser code {:
// TODO: remove when V1 code is dropped.
private TQueryOptions queryOptions;
+ // Whether the token is an "unreserved keyword".
+ // NOTE: This method should be kept in sync with the rule for
'ident_or_unreserved'.
+ private boolean isUnreservedKeyword(Integer tokenId) {
+ return (
+ tokenId == SqlParserSymbols.KW_DEFAULT ||
+ tokenId == SqlParserSymbols.KW_KILL ||
+ tokenId == SqlParserSymbols.KW_QUERY
+ );
+ }
+
// to avoid reporting trivial tokens as expected tokens in error messages
- private boolean reportExpectedToken(Integer tokenId, int numExpectedTokens) {
- if (SqlScanner.isKeyword(tokenId) ||
+ private boolean reportExpectedToken(
+ Integer tokenId, int numExpectedTokens, boolean identifierExpected) {
+ if (identifierExpected && isUnreservedKeyword(tokenId)) {
+ // Skip all "unreserved keywords" in the expected token list if
+ // 'IDENTIFIER' exists since unreserved keywords can be used as
identifiers.
+ return false;
+ } else if (SqlScanner.isKeyword(tokenId) ||
tokenId.intValue() == SqlParserSymbols.COMMA ||
tokenId.intValue() == SqlParserSymbols.IDENT) {
return true;
@@ -253,6 +268,12 @@ parser code {:
result.append("\n");
boolean identifierExpected = false;
+ for (Integer tokenId : expectedTokenIds_) {
+ if (tokenId == SqlParserSymbols.IDENT) {
+ identifierExpected = true;
+ break;
+ }
+ }
// append expected tokens
result.append("Expected: ");
@@ -261,11 +282,8 @@ parser code {:
Integer tokenId = null;
for (int i = 0; i < expectedTokenIds_.size(); ++i) {
tokenId = expectedTokenIds_.get(i);
- if (reportExpectedToken(tokenId, expectedTokenIds_.size())) {
+ if (reportExpectedToken(tokenId, expectedTokenIds_.size(),
identifierExpected)) {
expectedToken = SqlScanner.tokenIdMap.get(tokenId);
-
if(expectedToken.equals(SqlScanner.tokenIdMap.get(SqlParserSymbols.IDENT))) {
- identifierExpected = true;
- }
result.append(expectedToken + ", ");
}
}
@@ -317,13 +335,13 @@ terminal
KW_GROUP, KW_GROUPING, KW_HASH, KW_HUDIPARQUET, KW_IGNORE, KW_HAVING,
KW_ICEBERG, KW_IF,
KW_ILIKE, KW_IN, KW_INCREMENTAL, KW_INIT_FN, KW_INNER, KW_INPATH, KW_INSERT,
KW_INT,
KW_INTERMEDIATE, KW_INTERSECT, KW_INTERVAL, KW_INTO, KW_INVALIDATE,
KW_IREGEXP, KW_IS,
- KW_JDBC, KW_JOIN, KW_JSONFILE, KW_KUDU, KW_LAST, KW_LEADING, KW_LEFT,
KW_LEXICAL, KW_LIKE,
- KW_LIMIT, KW_LINES, KW_LOAD, KW_LOCATION, KW_LOGICAL_OR,
KW_MANAGED_LOCATION, KW_MAP, KW_MATCHED,
- KW_MERGE, KW_MERGE_FN, KW_METADATA, KW_MINUS, KW_NON, KW_NORELY, KW_NOT,
+ KW_JDBC, KW_JOIN, KW_JSONFILE, KW_KILL, KW_KUDU, KW_LAST, KW_LEADING,
KW_LEFT, KW_LEXICAL,
+ KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD, KW_LOCATION, KW_LOGICAL_OR,
KW_MANAGED_LOCATION, KW_MAP,
+ KW_MATCHED, KW_MERGE, KW_MERGE_FN, KW_METADATA, KW_MINUS, KW_NON, KW_NORELY,
KW_NOT,
KW_NOVALIDATE, KW_NULL, KW_NULLS, KW_OF, KW_OFFSET, KW_ON, KW_OPTIMIZE,
KW_OR,
KW_ORC, KW_ORDER, KW_OUTER,
KW_OVER, KW_OVERWRITE, KW_PARQUET, KW_PARQUETFILE, KW_PARTITION,
KW_PARTITIONED,
- KW_PARTITIONS, KW_PRECEDING, KW_PREPARE_FN, KW_PRIMARY, KW_PRODUCED,
KW_PURGE,
+ KW_PARTITIONS, KW_PRECEDING, KW_PREPARE_FN, KW_PRIMARY, KW_PRODUCED,
KW_PURGE, KW_QUERY,
KW_RANGE, KW_RCFILE, KW_RECOVER, KW_REFERENCES, KW_REFRESH, KW_REGEXP,
KW_RELY,
KW_RENAME, KW_REPEATABLE, KW_REPLACE, KW_REPLICATION, KW_RESTRICT,
KW_RETURNS,
KW_REVOKE, KW_RIGHT, KW_RLIKE, KW_ROLE, KW_ROLES, KW_ROLLUP, KW_ROW,
KW_ROWS, KW_RWSTORAGE,
@@ -355,10 +373,9 @@ terminal String UNMATCHED_STRING_LITERAL;
terminal String UNEXPECTED_CHAR;
terminal KW_CLUSTERED, KW_BUCKETS;
-// IMPALA-3726 introduced the DEFAULT keyword which could break existing
applications
-// that use the identifier "KEYWORD" as database, column or table names. To
avoid that,
-// the ident_or_default non-terminal is introduced and should be used instead
of IDENT.
-nonterminal String ident_or_default;
+// An identifier or an "unreserved keyword". An unreserved keyword is a
keyword that can
+// also be used as an identifier.
+nonterminal String ident_or_unreserved;
// A word is an arbitrary token composed of digits and at least one letter.
Reserved
// words cannot be used as identifiers but they are words and can be used in
query
// options, column attributes, etc.
@@ -637,6 +654,9 @@ nonterminal AdminFnStmt admin_fn_stmt;
// For "ALTER TABLE ... CONVERT TO" statements
nonterminal ConvertTableToIcebergStmt convert_tbl_stmt;
+// For "KILL QUERY" statemnts
+nonterminal KillQueryStmt kill_query_stmt;
+
precedence left KW_LOGICAL_OR;
precedence left KW_OR;
precedence left KW_AND;
@@ -794,6 +814,8 @@ stmt ::=
{: RESULT = s; :}
| convert_tbl_stmt: convert
{: RESULT = convert; :}
+ | kill_query_stmt: kill_query
+ {: RESULT = kill_query; :}
;
load_stmt ::=
@@ -825,7 +847,7 @@ reset_metadata_stmt ::=
{: RESULT = ResetMetadataStmt.createRefreshTableStmt(table); :}
| KW_REFRESH table_name:table partition_spec:partition
{: RESULT = ResetMetadataStmt.createRefreshPartitionStmt(table, partition);
:}
- | KW_REFRESH KW_FUNCTIONS ident_or_default:db
+ | KW_REFRESH KW_FUNCTIONS ident_or_unreserved:db
{: RESULT = ResetMetadataStmt.createRefreshFunctionsStmt(db); :}
| KW_REFRESH KW_AUTHORIZATION
{: RESULT = ResetMetadataStmt.createRefreshAuthorizationStmt(); :}
@@ -1147,34 +1169,34 @@ opt_kw_table ::=
show_roles_stmt ::=
KW_SHOW KW_ROLES
{: RESULT = new ShowRolesStmt(false, null); :}
- | KW_SHOW KW_ROLE KW_GRANT KW_GROUP ident_or_default:group
+ | KW_SHOW KW_ROLE KW_GRANT KW_GROUP ident_or_unreserved:group
{: RESULT = new ShowRolesStmt(false, group); :}
| KW_SHOW KW_CURRENT KW_ROLES
{: RESULT = new ShowRolesStmt(true, null); :}
;
show_grant_principal_stmt ::=
- KW_SHOW KW_GRANT principal_type:type ident_or_default:name
+ KW_SHOW KW_GRANT principal_type:type ident_or_unreserved:name
{: RESULT = new ShowGrantPrincipalStmt(name, type, null); :}
- | KW_SHOW KW_GRANT principal_type:type ident_or_default:name KW_ON
+ | KW_SHOW KW_GRANT principal_type:type ident_or_unreserved:name KW_ON
server_ident:server_kw
{:
RESULT = new ShowGrantPrincipalStmt(name, type,
PrivilegeSpec.createServerScopedPriv(TPrivilegeLevel.ALL));
:}
- | KW_SHOW KW_GRANT principal_type:type ident_or_default:name KW_ON
- KW_DATABASE ident_or_default:db_name
+ | KW_SHOW KW_GRANT principal_type:type ident_or_unreserved:name KW_ON
+ KW_DATABASE ident_or_unreserved:db_name
{:
RESULT = new ShowGrantPrincipalStmt(name, type,
PrivilegeSpec.createDbScopedPriv(TPrivilegeLevel.ALL, db_name));
:}
- | KW_SHOW KW_GRANT principal_type:type ident_or_default:name KW_ON KW_TABLE
+ | KW_SHOW KW_GRANT principal_type:type ident_or_unreserved:name KW_ON
KW_TABLE
table_name:tbl_name
{:
RESULT = new ShowGrantPrincipalStmt(name, type,
PrivilegeSpec.createTableScopedPriv(TPrivilegeLevel.ALL, tbl_name));
:}
- | KW_SHOW KW_GRANT principal_type:type ident_or_default:name KW_ON KW_COLUMN
+ | KW_SHOW KW_GRANT principal_type:type ident_or_unreserved:name KW_ON
KW_COLUMN
column_name:col_name
{:
RESULT = new ShowGrantPrincipalStmt(name, type,
@@ -1182,20 +1204,20 @@ show_grant_principal_stmt ::=
col_name.getTableName(),
Collections.singletonList(col_name.getColumnName())));
:}
- | KW_SHOW KW_GRANT principal_type:type ident_or_default:name KW_ON
uri_ident:uri_kw
+ | KW_SHOW KW_GRANT principal_type:type ident_or_unreserved:name KW_ON
uri_ident:uri_kw
STRING_LITERAL:uri
{:
RESULT = new ShowGrantPrincipalStmt(name, type,
PrivilegeSpec.createUriScopedPriv(TPrivilegeLevel.ALL, new
HdfsUri(uri)));
:}
- | KW_SHOW KW_GRANT principal_type:type ident_or_default:name KW_ON
KW_STORAGE_HANDLER_URI
+ | KW_SHOW KW_GRANT principal_type:type ident_or_unreserved:name KW_ON
KW_STORAGE_HANDLER_URI
STRING_LITERAL:storage_handler_uri
{:
RESULT = new ShowGrantPrincipalStmt(name, type,
PrivilegeSpec.createStorageHandlerUriScopedPriv(TPrivilegeLevel.RWSTORAGE,
new StorageHandlerUri(storage_handler_uri)));
:}
- | KW_SHOW KW_GRANT principal_type:type ident_or_default:name KW_ON KW_UDF
+ | KW_SHOW KW_GRANT principal_type:type ident_or_unreserved:name KW_ON KW_UDF
function_name:fn_name
{:
RESULT = new ShowGrantPrincipalStmt(name, type,
@@ -1204,38 +1226,38 @@ show_grant_principal_stmt ::=
;
create_drop_role_stmt ::=
- KW_CREATE KW_ROLE ident_or_default:role
+ KW_CREATE KW_ROLE ident_or_unreserved:role
{: RESULT = new CreateDropRoleStmt(role, false); :}
- | KW_DROP KW_ROLE ident_or_default:role
+ | KW_DROP KW_ROLE ident_or_unreserved:role
{: RESULT = new CreateDropRoleStmt(role, true); :}
;
grant_role_stmt ::=
- KW_GRANT KW_ROLE ident_or_default:role KW_TO KW_GROUP ident_or_default:group
+ KW_GRANT KW_ROLE ident_or_unreserved:role KW_TO KW_GROUP
ident_or_unreserved:group
{: RESULT = new GrantRevokeRoleStmt(role, group, true); :}
;
revoke_role_stmt ::=
- KW_REVOKE KW_ROLE ident_or_default:role KW_FROM KW_GROUP
ident_or_default:group
+ KW_REVOKE KW_ROLE ident_or_unreserved:role KW_FROM KW_GROUP
ident_or_unreserved:group
{: RESULT = new GrantRevokeRoleStmt(role, group, false); :}
;
// For backwards compatibility, a grant without the principal type will
default to
// TPrincipalType.ROLE
grant_privilege_stmt ::=
- KW_GRANT privilege_spec:priv KW_TO KW_ROLE ident_or_default:role
+ KW_GRANT privilege_spec:priv KW_TO KW_ROLE ident_or_unreserved:role
opt_with_grantopt:grant_opt
{: RESULT = new GrantRevokePrivStmt(role, priv, true, grant_opt,
TPrincipalType.ROLE); :}
- | KW_GRANT privilege_spec:priv KW_TO ident_or_default:role
+ | KW_GRANT privilege_spec:priv KW_TO ident_or_unreserved:role
opt_with_grantopt:grant_opt
{: RESULT = new GrantRevokePrivStmt(role, priv, true, grant_opt,
TPrincipalType.ROLE); :}
- | KW_GRANT privilege_spec:priv KW_TO IDENT:user_id ident_or_default:user
+ | KW_GRANT privilege_spec:priv KW_TO IDENT:user_id ident_or_unreserved:user
opt_with_grantopt:grant_opt
{:
parser.checkIdentKeyword("USER", user_id);
RESULT = new GrantRevokePrivStmt(user, priv, true, grant_opt,
TPrincipalType.USER);
:}
- | KW_GRANT privilege_spec:priv KW_TO KW_GROUP ident_or_default:group
+ | KW_GRANT privilege_spec:priv KW_TO KW_GROUP ident_or_unreserved:group
opt_with_grantopt:grant_opt
{: RESULT = new GrantRevokePrivStmt(group, priv, true, grant_opt,
TPrincipalType.GROUP); :}
;
@@ -1244,28 +1266,28 @@ grant_privilege_stmt ::=
// TPrincipalType.ROLE
revoke_privilege_stmt ::=
KW_REVOKE opt_grantopt_for:grant_opt privilege_spec:priv KW_FROM
- KW_ROLE ident_or_default:role
+ KW_ROLE ident_or_unreserved:role
{: RESULT = new GrantRevokePrivStmt(role, priv, false, grant_opt,
TPrincipalType.ROLE); :}
| KW_REVOKE opt_grantopt_for:grant_opt privilege_spec:priv KW_FROM
- ident_or_default:role
+ ident_or_unreserved:role
{: RESULT = new GrantRevokePrivStmt(role, priv, false, grant_opt,
TPrincipalType.ROLE); :}
| KW_REVOKE opt_grantopt_for:grant_opt privilege_spec:priv KW_FROM
- IDENT:user_id ident_or_default:user
+ IDENT:user_id ident_or_unreserved:user
{:
parser.checkIdentKeyword("USER", user_id);
RESULT = new GrantRevokePrivStmt(user, priv, false, grant_opt,
TPrincipalType.USER);
:}
| KW_REVOKE opt_grantopt_for:grant_opt privilege_spec:priv KW_FROM
- KW_GROUP ident_or_default:group
+ KW_GROUP ident_or_unreserved:group
{: RESULT = new GrantRevokePrivStmt(group, priv, false, grant_opt,
TPrincipalType.GROUP); :}
;
privilege_spec ::=
privilege:priv KW_ON server_ident:server_kw
{: RESULT = PrivilegeSpec.createServerScopedPriv(priv); :}
- | privilege:priv KW_ON server_ident:server_kw ident_or_default:server_name
+ | privilege:priv KW_ON server_ident:server_kw ident_or_unreserved:server_name
{: RESULT = PrivilegeSpec.createServerScopedPriv(priv, server_name); :}
- | privilege:priv KW_ON KW_DATABASE ident_or_default:db_name
+ | privilege:priv KW_ON KW_DATABASE ident_or_unreserved:db_name
{: RESULT = PrivilegeSpec.createDbScopedPriv(priv, db_name); :}
| privilege:priv KW_ON KW_TABLE table_name:tbl_name
{: RESULT = PrivilegeSpec.createTableScopedPriv(priv, tbl_name); :}
@@ -1346,7 +1368,7 @@ partition_def_list ::=
;
comment_on_stmt ::=
- KW_COMMENT KW_ON KW_DATABASE ident_or_default:db_name KW_IS
nullable_comment_val:comment
+ KW_COMMENT KW_ON KW_DATABASE ident_or_unreserved:db_name KW_IS
nullable_comment_val:comment
{: RESULT = new CommentOnDbStmt(db_name, comment); :}
| KW_COMMENT KW_ON KW_TABLE table_name:table KW_IS
nullable_comment_val:comment
{: RESULT = new CommentOnTableStmt(table, comment); :}
@@ -1360,15 +1382,15 @@ comment_on_stmt ::=
// such that any names that use OWNER or USER will need to be escaped. By
using IDENT
// token we can make OWNER and USER to be keywords only in these statements.
alter_db_stmt ::=
- KW_ALTER KW_DATABASE ident_or_default:db KW_SET IDENT:owner_id IDENT:user_id
- ident_or_default:user
+ KW_ALTER KW_DATABASE ident_or_unreserved:db KW_SET IDENT:owner_id
IDENT:user_id
+ ident_or_unreserved:user
{:
parser.checkIdentKeyword("OWNER", owner_id);
parser.checkIdentKeyword("USER", user_id);
RESULT = new AlterDbSetOwnerStmt(db, new Owner(TOwnerType.USER, user));
:}
- | KW_ALTER KW_DATABASE ident_or_default:db KW_SET IDENT:owner_id KW_ROLE
- ident_or_default:role
+ | KW_ALTER KW_DATABASE ident_or_unreserved:db KW_SET IDENT:owner_id KW_ROLE
+ ident_or_unreserved:role
{:
parser.checkIdentKeyword("OWNER", owner_id);
RESULT = new AlterDbSetOwnerStmt(db, new Owner(TOwnerType.ROLE, role));
@@ -1398,7 +1420,7 @@ alter_tbl_stmt ::=
| KW_ALTER KW_TABLE table_name:table KW_ADD if_not_exists_val:if_not_exists
partition_def_list:partitions KW_SET KW_FILEFORMAT
file_format_val:file_format
{: RESULT = new AlterTableAddPartitionStmt(table, if_not_exists, partitions,
file_format); :}
- | KW_ALTER KW_TABLE table_name:table KW_DROP opt_kw_column
ident_or_default:col_name
+ | KW_ALTER KW_TABLE table_name:table KW_DROP opt_kw_column
ident_or_unreserved:col_name
{: RESULT = new AlterTableDropColStmt(table, col_name); :}
| KW_ALTER KW_TABLE table_name:table KW_ADD if_not_exists_val:if_not_exists
KW_RANGE range_param:partition
@@ -1406,7 +1428,7 @@ alter_tbl_stmt ::=
RESULT = new AlterTableAddDropRangePartitionStmt(table, partition,
if_not_exists,
Operation.ADD);
:}
- | KW_ALTER KW_TABLE table_name:table KW_CHANGE opt_kw_column
ident_or_default:col_name
+ | KW_ALTER KW_TABLE table_name:table KW_CHANGE opt_kw_column
ident_or_unreserved:col_name
column_def:col_def
{: RESULT = AlterTableAlterColStmt.createChangeColStmt(table, col_name,
col_def); :}
| KW_ALTER KW_TABLE table_name:table KW_DROP if_exists_val:if_exists
@@ -1453,7 +1475,7 @@ alter_tbl_stmt ::=
RESULT = new AlterTableSortByStmt(table, col_names, TSortingOrder.ZORDER);
:}
| KW_ALTER KW_TABLE table_name:table opt_partition_set:partition KW_SET
- KW_COLUMN KW_STATS ident_or_default:col LPAREN properties_map:map RPAREN
+ KW_COLUMN KW_STATS ident_or_unreserved:col LPAREN properties_map:map RPAREN
{:
// See above for special partition clause handling.
if (partition != null) parser.parseError("set", SqlParserSymbols.KW_SET);
@@ -1471,17 +1493,17 @@ alter_tbl_stmt ::=
:}
| KW_ALTER KW_TABLE table_name:table KW_RECOVER KW_PARTITIONS
{: RESULT = new AlterTableRecoverPartitionsStmt(table); :}
- | KW_ALTER KW_TABLE table_name:table KW_ALTER opt_kw_column
ident_or_default:col_name
+ | KW_ALTER KW_TABLE table_name:table KW_ALTER opt_kw_column
ident_or_unreserved:col_name
KW_SET column_options_map:options
{:
RESULT = new AlterTableAlterColStmt(
table, col_name, new ColumnDef(col_name, null, options));
:}
- | KW_ALTER KW_TABLE table_name:table KW_ALTER opt_kw_column
ident_or_default:col_name
+ | KW_ALTER KW_TABLE table_name:table KW_ALTER opt_kw_column
ident_or_unreserved:col_name
KW_DROP KW_DEFAULT
{: RESULT = AlterTableAlterColStmt.createDropDefaultStmt(table, col_name); :}
| KW_ALTER KW_TABLE table_name:table opt_partition_set:partitions KW_SET
IDENT:owner_id
- IDENT:user_id ident_or_default:user
+ IDENT:user_id ident_or_unreserved:user
{:
// See above for special partition clause handling.
if (partitions != null) parser.parseError("set", SqlParserSymbols.KW_SET);
@@ -1490,7 +1512,7 @@ alter_tbl_stmt ::=
RESULT = new AlterTableSetOwnerStmt(table, new Owner(TOwnerType.USER,
user));
:}
| KW_ALTER KW_TABLE table_name:table opt_partition_set:partitions KW_SET
IDENT:owner_id
- KW_ROLE ident_or_default:role
+ KW_ROLE ident_or_unreserved:role
{:
// See above for special partition clause handling.
if (partitions != null) parser.parseError("set", SqlParserSymbols.KW_SET);
@@ -1521,6 +1543,13 @@ convert_tbl_stmt ::=
{: RESULT = new ConvertTableToIcebergStmt(table, props); :}
;
+kill_query_stmt ::=
+ KW_KILL KW_QUERY STRING_LITERAL:query_id
+ {:
+ RESULT = new KillQueryStmt(query_id);
+ :}
+ ;
+
table_property_type ::=
KW_TBLPROPERTIES
{: RESULT = TTablePropertyType.TBL_PROPERTY; :}
@@ -1534,7 +1563,7 @@ opt_kw_column ::=
;
create_db_stmt ::=
- KW_CREATE db_or_schema_kw if_not_exists_val:if_not_exists
ident_or_default:db_name
+ KW_CREATE db_or_schema_kw if_not_exists_val:if_not_exists
ident_or_unreserved:db_name
opt_comment_val:comment location_val:location
managed_location_val:managed_location
{: RESULT = new CreateDbStmt(db_name, comment, location,
managed_location, if_not_exists);
@@ -1645,7 +1674,7 @@ create_tbl_stmt ::=
RESULT = new CreateTableStmt(tbl_def);
:}
| KW_CREATE tbl_def_with_col_defs:tbl_def
- KW_PRODUCED KW_BY KW_DATA source_ident:is_source_id
ident_or_default:data_src_name
+ KW_PRODUCED KW_BY KW_DATA source_ident:is_source_id
ident_or_unreserved:data_src_name
opt_init_string_val:init_string
opt_comment_val:comment
{:
@@ -2083,21 +2112,21 @@ iceberg_partition_field_defs ::=
;
iceberg_partition_field_def ::=
- IDENT:col_name
+ ident_or_unreserved:col_name
{:
RESULT = new IcebergPartitionField(col_name,
IcebergUtil.getPartitionTransform("IDENTITY"));
:}
|
iceberg_partition_transform_type:partition_transform
- LPAREN IDENT:col_name RPAREN
+ LPAREN ident_or_unreserved:col_name RPAREN
{:
RESULT = new IcebergPartitionField(col_name,
IcebergUtil.getPartitionTransform(
partition_transform));
:}
| iceberg_partition_transform_type:partition_transform
LPAREN
- INTEGER_LITERAL:transform_param COMMA IDENT:col_name
+ INTEGER_LITERAL:transform_param COMMA ident_or_unreserved:col_name
RPAREN
{:
RESULT = new IcebergPartitionField(col_name,
IcebergUtil.getPartitionTransform(
@@ -2367,9 +2396,9 @@ column_def_list ::=
;
column_def ::=
- ident_or_default:col_name type_def:type column_options_map:options
+ ident_or_unreserved:col_name type_def:type column_options_map:options
{: RESULT = new ColumnDef(col_name, type, options); :}
- | ident_or_default:col_name type_def:type
+ | ident_or_unreserved:col_name type_def:type
{: RESULT = new ColumnDef(col_name, type); :}
;
@@ -2454,7 +2483,7 @@ create_view_stmt ::=
create_data_src_stmt ::=
KW_CREATE KW_DATA source_ident:is_source_id
- if_not_exists_val:if_not_exists ident_or_default:data_src_name
+ if_not_exists_val:if_not_exists ident_or_unreserved:data_src_name
location_val:location
KW_CLASS STRING_LITERAL:class_name
KW_API_VERSION STRING_LITERAL:api_version
@@ -2574,7 +2603,7 @@ view_column_def_list ::=
;
view_column_def ::=
- ident_or_default:col_name opt_comment_val:comment
+ ident_or_unreserved:col_name opt_comment_val:comment
{:
LinkedHashMap<Option, Object> options = Maps.newLinkedHashMap();
if (comment != null) options.put(Option.COMMENT, comment);
@@ -2589,13 +2618,13 @@ alter_view_stmt ::=
| KW_ALTER KW_VIEW table_name:before_table KW_RENAME KW_TO
table_name:new_table
{: RESULT = new AlterTableOrViewRenameStmt(before_table, new_table, false);
:}
| KW_ALTER KW_VIEW table_name:table KW_SET IDENT:owner_id IDENT:user_id
- ident_or_default:user
+ ident_or_unreserved:user
{:
parser.checkIdentKeyword("OWNER", owner_id);
parser.checkIdentKeyword("USER", user_id);
RESULT = new AlterViewSetOwnerStmt(table, new Owner(TOwnerType.USER,
user));
:}
- | KW_ALTER KW_VIEW table_name:table KW_SET IDENT:owner_id KW_ROLE
ident_or_default:role
+ | KW_ALTER KW_VIEW table_name:table KW_SET IDENT:owner_id KW_ROLE
ident_or_unreserved:role
{:
parser.checkIdentKeyword("OWNER", owner_id);
RESULT = new AlterViewSetOwnerStmt(table, new Owner(TOwnerType.ROLE,
role));
@@ -2647,7 +2676,7 @@ drop_stats_stmt ::=
;
drop_db_stmt ::=
- KW_DROP db_or_schema_kw if_exists_val:if_exists ident_or_default:db_name
+ KW_DROP db_or_schema_kw if_exists_val:if_exists ident_or_unreserved:db_name
cascade_val:cascade
{: RESULT = new DropDbStmt(db_name, if_exists, cascade); :}
;
@@ -2671,7 +2700,7 @@ drop_function_stmt ::=
drop_data_src_stmt ::=
KW_DROP KW_DATA source_ident:is_source_id if_exists_val:if_exists
- ident_or_default:data_src_name
+ ident_or_unreserved:data_src_name
{: RESULT = new DropDataSrcStmt(data_src_name, if_exists); :}
;
@@ -2761,7 +2790,7 @@ static_partition_key_value_list ::=
partition_key_value ::=
// Dynamic partition key values.
- ident_or_default:column
+ ident_or_unreserved:column
{: RESULT = new PartitionKeyValue(column, null); :}
| static_partition_key_value:partition
{: RESULT = partition; :}
@@ -2769,7 +2798,7 @@ partition_key_value ::=
static_partition_key_value ::=
// Static partition key values.
- ident_or_default:column EQUAL expr:e
+ ident_or_unreserved:column EQUAL expr:e
{: RESULT = new PartitionKeyValue(column, e); :}
;
@@ -2903,11 +2932,11 @@ opt_with_clause ::=
;
with_view_def ::=
- ident_or_default:alias KW_AS LPAREN query_stmt:query RPAREN
+ ident_or_unreserved:alias KW_AS LPAREN query_stmt:query RPAREN
{: RESULT = new View(alias, query, null); :}
| STRING_LITERAL:alias KW_AS LPAREN query_stmt:query RPAREN
{: RESULT = new View(alias, query, null); :}
- | ident_or_default:alias LPAREN ident_list:col_names RPAREN KW_AS LPAREN
+ | ident_or_unreserved:alias LPAREN ident_list:col_names RPAREN KW_AS LPAREN
query_stmt:query RPAREN
{: RESULT = new View(alias, query, col_names); :}
| STRING_LITERAL:alias LPAREN ident_list:col_names RPAREN
@@ -3057,7 +3086,7 @@ values_operand_list ::=
;
use_stmt ::=
- KW_USE ident_or_default:db
+ KW_USE ident_or_unreserved:db
{: RESULT = new UseStmt(db); :}
;
@@ -3066,9 +3095,9 @@ show_tables_stmt ::=
{: RESULT = new ShowTablesStmt(); :}
| KW_SHOW KW_TABLES show_pattern:showPattern
{: RESULT = new ShowTablesStmt(showPattern); :}
- | KW_SHOW KW_TABLES KW_IN ident_or_default:db
+ | KW_SHOW KW_TABLES KW_IN ident_or_unreserved:db
{: RESULT = new ShowTablesStmt(db, null); :}
- | KW_SHOW KW_TABLES KW_IN ident_or_default:db show_pattern:showPattern
+ | KW_SHOW KW_TABLES KW_IN ident_or_unreserved:db show_pattern:showPattern
{: RESULT = new ShowTablesStmt(db, showPattern); :}
;
@@ -3086,9 +3115,9 @@ show_views_stmt ::=
{: RESULT = new ShowViewsStmt(); :}
| KW_SHOW KW_VIEWS show_pattern:showPattern
{: RESULT = new ShowViewsStmt(showPattern); :}
- | KW_SHOW KW_VIEWS KW_IN ident_or_default:db
+ | KW_SHOW KW_VIEWS KW_IN ident_or_unreserved:db
{: RESULT = new ShowViewsStmt(db, null); :}
- | KW_SHOW KW_VIEWS KW_IN ident_or_default:db show_pattern:showPattern
+ | KW_SHOW KW_VIEWS KW_IN ident_or_unreserved:db show_pattern:showPattern
{: RESULT = new ShowViewsStmt(db, showPattern); :}
;
@@ -3126,9 +3155,9 @@ show_functions_stmt ::=
{: RESULT = new ShowFunctionsStmt(null, null, fn_type); :}
| KW_SHOW opt_function_category:fn_type KW_FUNCTIONS show_pattern:showPattern
{: RESULT = new ShowFunctionsStmt(null, showPattern, fn_type); :}
- | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN
ident_or_default:db
+ | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN
ident_or_unreserved:db
{: RESULT = new ShowFunctionsStmt(db, null, fn_type); :}
- | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN
ident_or_default:db
+ | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN
ident_or_unreserved:db
show_pattern:showPattern
{: RESULT = new ShowFunctionsStmt(db, showPattern, fn_type); :}
;
@@ -3181,7 +3210,7 @@ show_files_stmt ::=
;
describe_stmt ::=
- KW_DESCRIBE db_or_schema_kw describe_output_style:style ident_or_default:db
+ KW_DESCRIBE db_or_schema_kw describe_output_style:style
ident_or_unreserved:db
{: RESULT = new DescribeDbStmt(db, style); :}
| KW_DESCRIBE KW_FORMATTED dotted_path:path
{: RESULT = new DescribeTableStmt(path, TDescribeOutputStyle.FORMATTED); :}
@@ -3254,15 +3283,15 @@ select_clause ::=
;
set_stmt ::=
- KW_SET ident_or_default:key EQUAL numeric_literal:l
+ KW_SET ident_or_unreserved:key EQUAL numeric_literal:l
{: RESULT = new SetStmt(key, l.getStringValue(), TQueryOptionType.SET_ONE);
:}
- | KW_SET ident_or_default:key EQUAL STRING_LITERAL:l
+ | KW_SET ident_or_unreserved:key EQUAL STRING_LITERAL:l
{: RESULT = new SetStmt(key, l, TQueryOptionType.SET_ONE); :}
- | KW_SET ident_or_default:key EQUAL SUBTRACT numeric_literal:l
+ | KW_SET ident_or_unreserved:key EQUAL SUBTRACT numeric_literal:l
{:
l.swapSign();
RESULT = new SetStmt(key, l.getStringValue(), TQueryOptionType.SET_ONE); :}
- | KW_SET ident_or_default:key EQUAL word:value
+ | KW_SET ident_or_unreserved:key EQUAL word:value
{: RESULT = new SetStmt(key, value, TQueryOptionType.SET_ONE); :}
| KW_SET KW_ALL
{: RESULT = new SetStmt(null, null, TQueryOptionType.SET_ALL); :}
@@ -3277,9 +3306,9 @@ unset_stmt ::=
// Top-level function call, e.g. ": shutdown()", used for admin commands, etc.
admin_fn_stmt ::=
- COLON ident_or_default:fn_name LPAREN RPAREN
+ COLON ident_or_unreserved:fn_name LPAREN RPAREN
{: RESULT = new AdminFnStmt(fn_name, Collections.<Expr>emptyList()); :}
- | COLON ident_or_default:fn_name LPAREN expr_list:params RPAREN
+ | COLON ident_or_unreserved:fn_name LPAREN expr_list:params RPAREN
{: RESULT = new AdminFnStmt(fn_name, params); :}
;
@@ -3307,9 +3336,9 @@ select_list_item ::=
;
alias_clause ::=
- KW_AS ident_or_default:ident
+ KW_AS ident_or_unreserved:ident
{: RESULT = ident; :}
- | ident_or_default:ident
+ | ident_or_unreserved:ident
{: RESULT = ident; :}
| KW_AS STRING_LITERAL:l
{: RESULT = l; :}
@@ -3325,16 +3354,16 @@ star_expr ::=
;
table_name ::=
- ident_or_default:tbl
+ ident_or_unreserved:tbl
{: RESULT = new TableName(null, tbl); :}
- | ident_or_default:db DOT ident_or_default:tbl
+ | ident_or_unreserved:db DOT ident_or_unreserved:tbl
{: RESULT = new TableName(db, tbl); :}
;
column_name ::=
- | ident_or_default:tbl DOT ident_or_default:col
+ | ident_or_unreserved:tbl DOT ident_or_unreserved:col
{: RESULT = new ColumnName(new TableName(null, tbl), col); :}
- | ident_or_default:db DOT ident_or_default:tbl DOT ident_or_default:col
+ | ident_or_unreserved:db DOT ident_or_unreserved:tbl DOT
ident_or_unreserved:col
{: RESULT = new ColumnName(new TableName(db, tbl), col); :}
;
@@ -3546,11 +3575,11 @@ plan_hint ::=
{: RESULT = new PlanHint("straight_join"); :}
| KW_CLUSTERED
{: RESULT = new PlanHint("clustered"); :}
- | IDENT:name
+ | ident_or_unreserved:name
{: RESULT = new PlanHint(name); :}
- | IDENT:name LPAREN ident_list:args RPAREN
+ | ident_or_unreserved:name LPAREN ident_list:args RPAREN
{: RESULT = new PlanHint(name, args); :}
- | IDENT:name LPAREN INTEGER_LITERAL:value RPAREN
+ | ident_or_unreserved:name LPAREN INTEGER_LITERAL:value RPAREN
{:
if (value != null) {
RESULT = new PlanHint(name,
@@ -3577,13 +3606,13 @@ plan_hint_list ::=
;
ident_list ::=
- ident_or_default:ident
+ ident_or_unreserved:ident
{:
List<String> list = new ArrayList<>();
list.add(ident);
RESULT = list;
:}
- | ident_list:list COMMA ident_or_default:ident
+ | ident_list:list COMMA ident_or_unreserved:ident
{:
list.add(ident);
RESULT = list;
@@ -3913,7 +3942,7 @@ function_call_expr ::=
// avoid adding new keywords.
// Common syntax for "EXTRACT(unit FROM TIMESTAMP/DATE ts)"
// and "TRIM(where FROM string)":
- | function_name:fn_name LPAREN ident_or_default:i KW_FROM expr:e RPAREN
+ | function_name:fn_name LPAREN ident_or_unreserved:i KW_FROM expr:e RPAREN
{:
if (fn_name.toString().toLowerCase().endsWith("extract")) {
RESULT = new ExtractFromExpr(fn_name, i, e);
@@ -4264,13 +4293,13 @@ slot_ref ::=
;
dotted_path ::=
- ident_or_default:ident
+ ident_or_unreserved:ident
{:
List<String> list = new ArrayList<>();
list.add(ident);
RESULT = list;
:}
- | dotted_path:list DOT ident_or_default:ident
+ | dotted_path:list DOT ident_or_unreserved:ident
{:
list.add(ident);
RESULT = list;
@@ -4349,11 +4378,17 @@ struct_field_def_list ::=
:}
;
-ident_or_default ::=
+// NOTE: If a new keyword is added here, it should also be added to the
+// isUnreservedKeyword() method.
+ident_or_unreserved ::=
IDENT:name
{: RESULT = name.toString(); :}
| KW_DEFAULT:name
{: RESULT = name.toString(); :}
+ | KW_KILL:name
+ {: RESULT = name.toString(); :}
+ | KW_QUERY:name
+ {: RESULT = name.toString(); :}
;
word ::=
@@ -4587,6 +4622,8 @@ word ::=
{: RESULT = r.toString(); :}
| KW_JSONFILE:r
{: RESULT = r.toString(); :}
+ | KW_KILL:r
+ {: RESULT = r.toString(); :}
| KW_KUDU:r
{: RESULT = r.toString(); :}
| KW_LAST:r
@@ -4675,6 +4712,8 @@ word ::=
{: RESULT = r.toString(); :}
| KW_PURGE:r
{: RESULT = r.toString(); :}
+ | KW_QUERY:r
+ {: RESULT = r.toString(); :}
| KW_RANGE:r
{: RESULT = r.toString(); :}
| KW_RCFILE:r
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index 6f28c3a9f..b23dc9a71 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -224,6 +224,10 @@ public class AnalysisContext {
return stmt_ instanceof ConvertTableToIcebergStmt;
}
+ public boolean isKillQueryStmt() {
+ return stmt_ instanceof KillQueryStmt;
+ }
+
public AlterTableStmt getAlterTableStmt() {
Preconditions.checkState(isAlterTableStmt());
return (AlterTableStmt) stmt_;
@@ -418,6 +422,11 @@ public class AnalysisContext {
return (ConvertTableToIcebergStmt) stmt_;
}
+ public KillQueryStmt getKillQueryStmt() {
+ Preconditions.checkState(isKillQueryStmt());
+ return (KillQueryStmt) stmt_;
+ }
+
public StatementBase getStmt() { return stmt_; }
public Analyzer getAnalyzer() { return analyzer_; }
public Set<TAccessEvent> getAccessEvents() { return
analyzer_.getAccessEvents(); }
diff --git a/fe/src/main/java/org/apache/impala/analysis/KillQueryStmt.java
b/fe/src/main/java/org/apache/impala/analysis/KillQueryStmt.java
new file mode 100644
index 000000000..b344d9ff6
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/KillQueryStmt.java
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
+import org.apache.impala.authorization.AuthorizationConfig;
+import org.apache.impala.authorization.User;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.thrift.TKillQueryReq;
+import org.apache.impala.thrift.TUniqueId;
+import org.apache.impala.util.TUniqueIdUtil;
+
+import hiveexec.com.google.common.base.Preconditions;
+
+public final class KillQueryStmt extends StatementBase {
+ private final String queryIdString_;
+ private TUniqueId queryId_;
+ private User requestingUser_;
+ private boolean requestedByAdmin_ = true;
+
+ public KillQueryStmt(String queryId) {
+ queryIdString_ = queryId;
+ }
+
+ @Override
+ public String toSql(ToSqlOptions options) {
+ return String.format("KILL QUERY '%s'", queryIdString_);
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws AnalysisException {
+ super.analyze(analyzer);
+ try {
+ queryId_ = TUniqueIdUtil.ParseId(queryIdString_);
+ } catch (NumberFormatException e) {
+ throw new AnalysisException(
+ String.format("Invalid query id: '%s'", queryIdString_));
+ }
+ requestingUser_ = analyzer.getUser();
+ AuthorizationConfig authzConfig = analyzer.getAuthzConfig();
+ if (authzConfig.isEnabled()) {
+ // Check whether the user is an admin (i.e. user with ALL privilege on
server).
+ String authzServer = authzConfig.getServerName();
+ Preconditions.checkNotNull(authzServer);
+ analyzer.setMaskPrivChecks(null);
+ analyzer.registerPrivReq(builder ->
builder.onServer(authzServer).all().build());
+ }
+ }
+
+ public TKillQueryReq toThrift() {
+ return new TKillQueryReq(queryId_, requestingUser_.getName(),
requestedByAdmin_);
+ }
+
+ @Override
+ public void handleAuthorizationException(AnalysisResult analysisResult) {
+ requestedByAdmin_ = false;
+
+ // By default, a user will not be authorized to access the runtime profile
if a
+ // masked privilege request fails. This will disallow any non-admin user
to access
+ // the runtime profile even if the user can kill the query.
+ //
+ // Set to true to allow non-admin users to access the runtime profile.
+ analysisResult.setUserHasProfileAccess(true);
+ }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
b/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
index 8f45c0e40..7512e2026 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.NotImplementedException;
+import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.ScalarType;
import org.apache.impala.catalog.Type;
@@ -274,4 +275,27 @@ public abstract class StatementBase extends StmtNode {
}
return srcExpr.castTo(compatType, compatibilityLevel);
}
+
+ /**
+ * This method is designed to allow custom authorization exception handling.
A
+ * statement class that extends StatementBase can customize how it handles an
+ * authorization exception by
+ * 1. Registering a masked privilege request and set the associated error
message to
+ * null, so that this custom handler will be called when an authorization
exception
+ * is thrown;
+ * 2. Overriding this method to specify how to handle the authorization
exception for
+ * this statement class.
+ *
+ * By default, the handler ignores the exception. If a statement class
registers only
+ * masked authorization exceptions with error message set to null, it might
need to
+ * override this method.
+ *
+ * NOTE: Currently access to the runtime profile is set to disabled when a
masked
+ * authorization exception is thrown before this handler gets called (Refer
to
+ * BaseAuthorizationChecker.authorize() for details).
+ * @param analysisResult
+ */
+ public void handleAuthorizationException(AnalysisResult analysisResult) {
+ // Do nothing
+ }
}
diff --git
a/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java
b/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java
index d4ea0df53..d43aa76f4 100644
---
a/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java
+++
b/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java
@@ -177,10 +177,9 @@ public abstract class BaseAuthorizationChecker implements
AuthorizationChecker {
}
// Check all masked requests. If a masked request has an associated error
message,
- // an AuthorizationException is thrown if authorization fails. Masked
requests with
- // no error message are used to check if the user can access the runtime
profile.
- // These checks don't result in an AuthorizationException but set the
- // 'user_has_profile_access' flag in queryCtx_.
+ // an AuthorizationException is thrown if authorization fails. Otherwise,
the custom
+ // AuthorizationException handler of the statement will be called and the
user will
+ // not be authorized to access the runtime profile by default if
authorization fails.
for (Pair<PrivilegeRequest, String> maskedReq :
analyzer.getMaskedPrivilegeReqs()) {
try {
authzCtx.setRetainAudits(false);
@@ -189,6 +188,8 @@ public abstract class BaseAuthorizationChecker implements
AuthorizationChecker {
analysisResult.setUserHasProfileAccess(false);
if (!Strings.isNullOrEmpty(maskedReq.second)) {
throw new AuthorizationException(maskedReq.second);
+ } else {
+
analysisResult.getStmt().handleAuthorizationException(analysisResult);
}
break;
} finally {
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 539d5dedd..4f5146dfc 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -2711,6 +2711,12 @@ public class Frontend {
createCatalogOpRequest(analysisResult, result);
}
return result;
+ } else if (analysisResult.isKillQueryStmt()) {
+ result.stmt_type = TStmtType.KILL;
+ result.setResult_set_metadata(new TResultSetMetadata(
+ Collections.singletonList(new TColumn("result",
Type.STRING.toThrift()))));
+
result.setKill_query_request(analysisResult.getKillQueryStmt().toThrift());
+ return result;
}
// Open or continue Kudu transaction if Kudu transaction is enabled and
target table
diff --git a/fe/src/main/jflex/sql-scanner.flex
b/fe/src/main/jflex/sql-scanner.flex
index bfbc3c72f..be7c86811 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -179,6 +179,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
keywordMap.put("jdbc", SqlParserSymbols.KW_JDBC);
keywordMap.put("join", SqlParserSymbols.KW_JOIN);
keywordMap.put("jsonfile", SqlParserSymbols.KW_JSONFILE);
+ keywordMap.put("kill", SqlParserSymbols.KW_KILL);
keywordMap.put("kudu", SqlParserSymbols.KW_KUDU);
keywordMap.put("last", SqlParserSymbols.KW_LAST);
keywordMap.put("leading", SqlParserSymbols.KW_LEADING);
@@ -223,6 +224,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
keywordMap.put("primary", SqlParserSymbols.KW_PRIMARY);
keywordMap.put("produced", SqlParserSymbols.KW_PRODUCED);
keywordMap.put("purge", SqlParserSymbols.KW_PURGE);
+ keywordMap.put("query", SqlParserSymbols.KW_QUERY);
keywordMap.put("range", SqlParserSymbols.KW_RANGE);
keywordMap.put("rcfile", SqlParserSymbols.KW_RCFILE);
keywordMap.put("real", SqlParserSymbols.KW_DOUBLE);
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index 80d503308..f86bc96d7 100755
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -3708,8 +3708,9 @@ public class ParserTest extends FrontendTestBase {
"^\n" +
"Encountered: IDENTIFIER\n" +
"Expected: ALTER, COMMENT, COMPUTE, COPY, CREATE, DELETE, DESCRIBE,
DROP, " +
- "EXPLAIN, GRANT, INSERT, INVALIDATE, LOAD, MERGE, OPTIMIZE, REFRESH,
REVOKE, " +
- "SELECT, SET, SHOW, TRUNCATE, UNSET, UPDATE, UPSERT, USE, VALUES,
WITH\n");
+ "EXPLAIN, GRANT, INSERT, INVALIDATE, KILL, LOAD, MERGE, OPTIMIZE,
REFRESH, " +
+ "REVOKE, SELECT, SET, SHOW, TRUNCATE, UNSET, UPDATE, UPSERT, USE,
VALUES, " +
+ "WITH\n");
// missing select list
ParserError("select from t",
@@ -3717,7 +3718,7 @@ public class ParserTest extends FrontendTestBase {
"select from t\n" +
" ^\n" +
"Encountered: FROM\n" +
- "Expected: ALL, CASE, CAST, DATE, DEFAULT, DISTINCT, EXISTS, FALSE,
GROUPING, " +
+ "Expected: ALL, CASE, CAST, DATE, DISTINCT, EXISTS, FALSE, GROUPING, "
+
"IF, INTERVAL, LEFT, NOT, NULL, REPLACE, RIGHT, STRAIGHT_JOIN,
TRUNCATE, TRUE, " +
"UNNEST, IDENTIFIER\n" +
"\n" +
@@ -3730,7 +3731,7 @@ public class ParserTest extends FrontendTestBase {
"select c, b, c where a = 5\n" +
" ^\n" +
"Encountered: WHERE\n" +
- "Expected: AND, AS, BETWEEN, DEFAULT, DIV, EXCEPT, FROM, ILIKE, IN,
INTERSECT, " +
+ "Expected: AND, AS, BETWEEN, DIV, EXCEPT, FROM, ILIKE, IN, INTERSECT,
" +
"IREGEXP, IS, LIKE, LIMIT, ||, MINUS, NOT, OR, ORDER, REGEXP, RLIKE,
UNION, " +
"COMMA, IDENTIFIER\n" +
"\n" +
@@ -3743,7 +3744,7 @@ public class ParserTest extends FrontendTestBase {
"select c, b, c from where a = 5\n" +
" ^\n" +
"Encountered: WHERE\n" +
- "Expected: DEFAULT, UNNEST, IDENTIFIER\n" +
+ "Expected: UNNEST, IDENTIFIER\n" +
"\n" +
"Hint: reserved words have to be escaped when used as an identifier,
e.g. `where`"
);
@@ -3754,7 +3755,7 @@ public class ParserTest extends FrontendTestBase {
"select c, b, c from t where\n" +
" ^\n" +
"Encountered: EOF\n" +
- "Expected: CASE, CAST, DATE, DEFAULT, EXISTS, FALSE, GROUPING, IF,
INTERVAL, " +
+ "Expected: CASE, CAST, DATE, EXISTS, FALSE, GROUPING, IF, INTERVAL, " +
"LEFT, NOT, NULL, REPLACE, RIGHT, STRAIGHT_JOIN, TRUNCATE, TRUE,
UNNEST, " +
"IDENTIFIER");
@@ -3764,7 +3765,7 @@ public class ParserTest extends FrontendTestBase {
"select c, b, c from t where group by a, b\n" +
" ^\n" +
"Encountered: GROUP\n" +
- "Expected: CASE, CAST, DATE, DEFAULT, EXISTS, FALSE, GROUPING, IF,
INTERVAL, " +
+ "Expected: CASE, CAST, DATE, EXISTS, FALSE, GROUPING, IF, INTERVAL, " +
"LEFT, NOT, NULL, REPLACE, RIGHT, STRAIGHT_JOIN, TRUNCATE, TRUE,
UNNEST, " +
"IDENTIFIER\n" +
"\n" +
@@ -3831,7 +3832,7 @@ public class ParserTest extends FrontendTestBase {
"...c,c,c,c,c,c,c,c,cd,c,d,d, ,c, from t\n" +
" ^\n" +
"Encountered: COMMA\n" +
- "Expected: CASE, CAST, DATE, DEFAULT, EXISTS, FALSE, GROUPING, IF,
INTERVAL, " +
+ "Expected: CASE, CAST, DATE, EXISTS, FALSE, GROUPING, IF, INTERVAL, " +
"LEFT, NOT, NULL, REPLACE, RIGHT, TRUNCATE, TRUE, UNNEST, IDENTIFIER");
// Parsing identifiers that have different names printed as EXPECTED
@@ -3852,7 +3853,7 @@ public class ParserTest extends FrontendTestBase {
"USE ` `\n" +
" ^\n" +
"Encountered: EMPTY IDENTIFIER\n" +
- "Expected: DEFAULT, IDENTIFIER\n");
+ "Expected: IDENTIFIER\n");
// Expecting = token
ParserError("SET foo",
@@ -3868,7 +3869,7 @@ public class ParserTest extends FrontendTestBase {
"\n" +
"^\n" +
"Encountered: EOF\n" +
- "Expected: ALL, CASE, CAST, DATE, DEFAULT, DISTINCT, EXISTS, FALSE,
GROUPING, " +
+ "Expected: ALL, CASE, CAST, DATE, DISTINCT, EXISTS, FALSE, GROUPING,
" +
"IF, INTERVAL, LEFT, NOT, NULL, REPLACE, RIGHT, " +
"STRAIGHT_JOIN, TRUNCATE, TRUE, UNNEST, IDENTIFIER\n");
ParserError("SELECT\n\n",
@@ -3876,7 +3877,7 @@ public class ParserTest extends FrontendTestBase {
"\n" +
"^\n" +
"Encountered: EOF\n" +
- "Expected: ALL, CASE, CAST, DATE, DEFAULT, DISTINCT, EXISTS, FALSE,
GROUPING, " +
+ "Expected: ALL, CASE, CAST, DATE, DISTINCT, EXISTS, FALSE, GROUPING,
" +
"IF, INTERVAL, LEFT, NOT, NULL, REPLACE, RIGHT, " +
"STRAIGHT_JOIN, TRUNCATE, TRUE, UNNEST, IDENTIFIER\n");
}
@@ -4549,4 +4550,14 @@ public class ParserTest extends FrontendTestBase {
ParsesOk("--test\nSELECT 1\n");
ParsesOk("--test\nSELECT 1\n ");
}
+
+ @Test
+ public void TestUnreservedKeywords() {
+ // Test if "unreserved keywords" can be used as identifiers, such as table
names and
+ // column names.
+ final String[] unreservedKeywords = { "DEFAULT", "KILL", "QUERY" };
+ for (String keyword : unreservedKeywords) {
+ ParsesOk(String.format("CREATE TABLE %s (%s INT);", keyword, keyword));
+ }
+ }
}
diff --git a/tests/common/cluster_config.py b/tests/common/cluster_config.py
new file mode 100644
index 000000000..305d74a8c
--- /dev/null
+++ b/tests/common/cluster_config.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Common cluster configurations as decorators for custom cluster tests
+
+from __future__ import absolute_import, division, print_function
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+
+# Same as in tests/authorization/test_ranger.py
+ADMIN = "admin"
+
+enable_authorization = CustomClusterTestSuite.with_args(
+ # Same as IMPALAD_ARGS and CATALOGD_ARGS in
tests/authorization/test_ranger.py
+ impalad_args="--server-name=server1 --ranger_service_type=hive "
+ "--ranger_app_id=impala --authorization_provider=ranger",
+ catalogd_args="--server-name=server1 --ranger_service_type=hive "
+ "--ranger_app_id=impala --authorization_provider=ranger"
+)
+
+
+def impalad_admission_ctrl_flags(max_requests, max_queued, pool_max_mem,
+ proc_mem_limit=None,
queue_wait_timeout_ms=None,
+ admission_control_slots=None,
executor_groups=None,
+ codegen_cache_capacity=0):
+ extra_flags = ""
+ if proc_mem_limit is not None:
+ extra_flags += " -mem_limit={0}".format(proc_mem_limit)
+ if queue_wait_timeout_ms is not None:
+ extra_flags += " -queue_wait_timeout_ms={0}".format(queue_wait_timeout_ms)
+ if admission_control_slots is not None:
+ extra_flags += "
-admission_control_slots={0}".format(admission_control_slots)
+ if executor_groups is not None:
+ extra_flags += " -executor_groups={0}".format(executor_groups)
+ extra_flags += " -codegen_cache_capacity={0}".format(codegen_cache_capacity)
+
+ return ("-vmodule admission-controller=3 -default_pool_max_requests {0} "
+ "-default_pool_max_queued {1} -default_pool_mem_limit {2}
{3}".format(
+ max_requests, max_queued, pool_max_mem, extra_flags))
+
+
+admit_one_query_at_a_time = CustomClusterTestSuite.with_args(
+ impalad_args=impalad_admission_ctrl_flags(1, 1, 0)
+)
+admit_no_query = CustomClusterTestSuite.with_args(
+ impalad_args=impalad_admission_ctrl_flags(0, 0, 0)
+)
+single_coordinator = CustomClusterTestSuite.with_args(
+ num_exclusive_coordinators=1
+)
diff --git a/tests/common/custom_cluster_test_suite.py
b/tests/common/custom_cluster_test_suite.py
index c445d37f4..d0d3275b8 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -107,7 +107,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
TMP_DIRS = dict()
# Args for cluster startup/teardown when sharing a single cluster for the
entire class.
- SHARED_CLUSTER_ARGS = None
+ SHARED_CLUSTER_ARGS = {}
@classmethod
def get_workload(cls):
@@ -199,14 +199,35 @@ class CustomClusterTestSuite(ImpalaTestSuite):
if log_symlinks:
args[LOG_SYMLINKS] = True
+ def merge_args(args_first, args_last):
+ result = args_first.copy()
+ for key in args_last:
+ if key not in result:
+ result[key] = args_last[key]
+ else:
+ if key in (
+ IMPALAD_ARGS,
+ STATESTORED_ARGS,
+ CATALOGD_ARGS,
+ START_ARGS,
+ JVM_ARGS,
+ KUDU_ARGS
+ ):
+ # Let the server decide.
+ result[key] = " ".join((result[key], args_last[key]))
+ else:
+ # Last writer wins.
+ result[key] = args_last[key]
+ return result
+
def decorate(obj):
"""If obj is a class, set SHARED_CLUSTER_ARGS for setup/teardown_class.
Otherwise
add to the function __dict__ for setup/teardown_method."""
if inspect.isclass(obj):
- obj.SHARED_CLUSTER_ARGS = args
+ obj.SHARED_CLUSTER_ARGS = merge_args(obj.SHARED_CLUSTER_ARGS, args)
else:
obj.__dict__[WITH_ARGS_METHOD] = True
- obj.__dict__.update(args)
+ obj.__dict__ = merge_args(obj.__dict__, args)
return obj
return decorate
diff --git a/tests/common/impala_connection.py
b/tests/common/impala_connection.py
index bc81ebb76..262eaaa35 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -89,6 +89,17 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, object)):
def __exit__(self, exc_type, exc_value, traceback):
self.close()
+ @abc.abstractmethod
+ def get_test_protocol(self):
+ """Return client protocol name that is specific to Impala test framework.
+ Possible return value are either of 'beeswax', 'hs2', or 'hs2-http'."""
+ pass
+
+ @abc.abstractmethod
+ def get_host_port(self):
+ """Return the 'host:port' string of impala server that this object
connecting to."""
+ pass
+
@abc.abstractmethod
def set_configuration_option(self, name, value):
"""Sets a configuration option name to the given value"""
@@ -188,6 +199,12 @@ class BeeswaxConnection(ImpalaConnection):
self.__host_port = host_port
self.QUERY_STATES = self.__beeswax_client.query_states
+ def get_test_protocol(self):
+ return 'beeswax'
+
+ def get_host_port(self):
+ return self.__host_port
+
def set_configuration_option(self, name, value):
# Only set the option if it's not already set to the same value.
if self.__beeswax_client.get_query_option(name) != value:
@@ -287,6 +304,9 @@ class BeeswaxConnection(ImpalaConnection):
handle_id = self.handle_id(operation_handle)
LOG.info("-- {0}: {1}".format(handle_id, message))
+ def get_query_id(self, operation_handle):
+ return operation_handle.get_handle().id
+
class ImpylaHS2Connection(ImpalaConnection):
"""Connection to Impala using the impyla client connecting to HS2 endpoint.
@@ -316,6 +336,15 @@ class ImpylaHS2Connection(ImpalaConnection):
# profile and log from Impala.
self._collect_profile_and_log = collect_profile_and_log
+ def get_test_protocol(self):
+ if self.__http_path:
+ return 'hs2-http'
+ else:
+ return 'hs2'
+
+ def get_host_port(self):
+ return self.__host_port
+
def set_configuration_option(self, name, value):
self.__query_options[name] = str(value)
diff --git a/tests/custom_cluster/test_admission_controller.py
b/tests/custom_cluster/test_admission_controller.py
index 71a204b6c..83c6b09ed 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -34,6 +34,7 @@ from time import sleep, time
from beeswaxd.BeeswaxService import QueryState
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.cluster_config import impalad_admission_ctrl_flags
from tests.common.custom_cluster_test_suite import (
ADMISSIOND_ARGS,
IMPALAD_ARGS,
@@ -143,26 +144,6 @@ INITIAL_QUEUE_REASON_REGEX = \
RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test",
"resources")
-def impalad_admission_ctrl_flags(max_requests, max_queued, pool_max_mem,
- proc_mem_limit=None,
queue_wait_timeout_ms=None,
- admission_control_slots=None,
executor_groups=None,
- codegen_cache_capacity=0):
- extra_flags = ""
- if proc_mem_limit is not None:
- extra_flags += " -mem_limit={0}".format(proc_mem_limit)
- if queue_wait_timeout_ms is not None:
- extra_flags += " -queue_wait_timeout_ms={0}".format(queue_wait_timeout_ms)
- if admission_control_slots is not None:
- extra_flags += "
-admission_control_slots={0}".format(admission_control_slots)
- if executor_groups is not None:
- extra_flags += " -executor_groups={0}".format(executor_groups)
- extra_flags += " -codegen_cache_capacity={0}".format(codegen_cache_capacity)
-
- return ("-vmodule admission-controller=3 -default_pool_max_requests {0} "
- "-default_pool_max_queued {1} -default_pool_mem_limit {2}
{3}".format(
- max_requests, max_queued, pool_max_mem, extra_flags))
-
-
def impalad_admission_ctrl_config_args(fs_allocation_file, llama_site_file,
additional_args="", make_copy=False):
fs_allocation_path = os.path.join(RESOURCES_DIR, fs_allocation_file)
diff --git a/tests/custom_cluster/test_kill_query.py
b/tests/custom_cluster/test_kill_query.py
new file mode 100644
index 000000000..1e035a75a
--- /dev/null
+++ b/tests/custom_cluster/test_kill_query.py
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import, division, print_function
+
+import pytest
+
+import tests.common.cluster_config as cluster_config
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.test_result_verifier import error_msg_expected
+from tests.util.cancel_util import (
+ QueryToKill,
+ assert_kill_error,
+ assert_kill_ok
+)
+
+
+class TestKillQuery(CustomClusterTestSuite):
+ @pytest.mark.execute_serially
+ def test_coordinator_unreachable(self):
+ """
+ The coordinator of the query to kill is unreachable.
+
+ It is required that each impalad in the cluster is a coordinator.
+ """
+ protocol = 'hs2'
+ with self.create_client_for_nth_impalad(0, protocol) as client, \
+ QueryToKill(
+ self,
+ protocol,
+ check_on_exit=False,
+ nth_impalad=2) as query_id_to_kill:
+ coordinator_to_kill = self.cluster.impalads[2]
+ coordinator_to_kill.kill()
+ assert_kill_error(
+ client,
+ "KillQuery() RPC failed: Network error:",
+ query_id=query_id_to_kill,
+ )
+
+ @pytest.mark.execute_serially
+ def test_another_coordinator_unreachable(self):
+ """
+ A coordinator other than the one of the query to kill is unreachable.
+
+ It is required that each impalad in the cluster is a coordinator.
+ """
+ protocol = 'hs2'
+ with self.create_client_for_nth_impalad(0, protocol) as client, \
+ QueryToKill(self, protocol, nth_impalad=2) as query_id_to_kill:
+ coordinator_to_kill = self.cluster.impalads[1] # impalad 1 is between 0
and 2.
+ coordinator_to_kill.kill()
+ assert_kill_ok(client, query_id_to_kill)
+
+ @pytest.mark.execute_serially
+ @cluster_config.single_coordinator
+ def test_single_coordinator(self):
+ """
+ Test when there is only one coordinator in the cluster.
+ """
+ protocol = 'hs2'
+ with self.create_client_for_nth_impalad(0, protocol) as client:
+ assert_kill_error(
+ client,
+ "Could not find query on any coordinator.",
+ query_id='123:456')
+
+ @pytest.mark.execute_serially
+ @cluster_config.admit_one_query_at_a_time
+ def test_admit_one_query_at_a_time(self):
+ """
+ Make sure queries can be killed when only one query is allowed to run at a
time.
+ """
+ protocol = 'hs2'
+ with self.create_client_for_nth_impalad(0, protocol) as client, \
+ QueryToKill(self, protocol) as query_id_to_kill:
+ assert_kill_ok(client, query_id_to_kill)
+
+ @pytest.mark.execute_serially
+ @cluster_config.admit_no_query
+ def test_admit_no_query(self):
+ """
+ Make sure KILL QUERY statement can be executed when no query will be
admitted.
+
+ This is to show that KILL QUERY statements are not subject to admission
control.
+ """
+ protocol = 'hs2'
+ with self.create_client_for_nth_impalad(0, protocol) as client:
+ try:
+ client.execute("SELECT 1")
+ except Exception as e:
+ expected_msg = (
+ "Rejected query from pool default-pool: "
+ "disabled by requests limit set to 0"
+ )
+ assert error_msg_expected(str(e), expected_msg)
+ assert_kill_error(
+ client,
+ "Could not find query on any coordinator",
+ query_id="123:456"
+ )
+
+
+@cluster_config.enable_authorization
+class TestKillQueryAuthorization(CustomClusterTestSuite):
+ @pytest.mark.execute_serially
+ def test_kill_as_admin(self):
+ # ImpylaHS2Connection does not support authentication yet.
+ protocol = 'beeswax'
+ with self.create_client_for_nth_impalad(0, protocol) as client, \
+ QueryToKill(self, protocol, user="user1") as query_id_to_kill:
+ assert_kill_ok(client, query_id_to_kill, user=cluster_config.ADMIN)
+
+ @pytest.mark.execute_serially
+ def test_kill_as_non_admin(self):
+ # ImpylaHS2Connection does not support authentication yet.
+ protocol = 'beeswax'
+ user1, user2 = "user1", "user2"
+ with self.create_client_for_nth_impalad(0, protocol) as user1_client, \
+ self.create_client_for_nth_impalad(0, protocol) as user2_client, \
+ QueryToKill(self, protocol, user=user1) as query_id_to_kill:
+ assert_kill_error(
+ user2_client,
+ "User '{0}' is not authorized to kill the query.".format(user2),
+ query_id=query_id_to_kill,
+ user=user2,
+ )
+ assert_kill_ok(user1_client, query_id_to_kill, user=user1)
diff --git a/tests/query_test/test_cancellation.py
b/tests/query_test/test_cancellation.py
index 21fc59497..342a39f03 100644
--- a/tests/query_test/test_cancellation.py
+++ b/tests/query_test/test_cancellation.py
@@ -82,6 +82,11 @@ SORT_BUFFER_POOL_LIMIT = ['0', '300m'] # Test spilling and
non-spilling sorts.
# Test with and without multithreading
MT_DOP_VALUES = [0, 4]
+# Use KILL QUERY statement or not.
+# False: Send the Thrift RPCs directly.
+# True: Execute a KILL QUERY statement.
+USE_KILL_QUERY_STATEMENT = [False, True]
+
class TestCancellation(ImpalaTestSuite):
@classmethod
def get_workload(self):
@@ -108,6 +113,8 @@ class TestCancellation(ImpalaTestSuite):
ImpalaTestDimension('cpu_limit_s', *CPU_LIMIT_S))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('mt_dop', *MT_DOP_VALUES))
+ cls.ImpalaTestMatrix.add_dimension(
+ ImpalaTestDimension('use_kill_query_statement',
*USE_KILL_QUERY_STATEMENT))
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('query_type') != 'CTAS' or (\
@@ -119,6 +126,11 @@ class TestCancellation(ImpalaTestSuite):
cls.ImpalaTestMatrix.add_constraint(
lambda v: not (v.get_value('query_type') == 'CTAS' and
v.get_value('query').startswith('compute stats')))
+ # 'use_kill_query_statement' and 'join_before_close' cannot be both True,
since
+ # the KILL QUERY statement will also close the query.
+ cls.ImpalaTestMatrix.add_constraint(
+ lambda v: not (v.get_value('use_kill_query_statement')
+ and v.get_value('join_before_close')))
# Ignore CTAS on Kudu if there is no PRIMARY KEY specified.
cls.ImpalaTestMatrix.add_constraint(
@@ -167,7 +179,8 @@ class TestCancellation(ImpalaTestSuite):
for i in range(NUM_CANCELATION_ITERATIONS):
cancel_query_and_validate_state(self.client, query,
vector.get_value('exec_option'), vector.get_value('table_format'),
- vector.get_value('cancel_delay'),
vector.get_value('join_before_close'))
+ vector.get_value('cancel_delay'),
vector.get_value('join_before_close'),
+
use_kill_query_statement=vector.get_value('use_kill_query_statement'))
if query_type == "CTAS":
self.cleanup_test_table(vector.get_value('table_format'))
diff --git a/tests/query_test/test_kill_query.py
b/tests/query_test/test_kill_query.py
new file mode 100644
index 000000000..c456c3fe5
--- /dev/null
+++ b/tests/query_test/test_kill_query.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import, division, print_function
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_dimensions import create_client_protocol_dimension
+from tests.util.cancel_util import (
+ QueryToKill,
+ assert_kill_error,
+ assert_kill_ok
+)
+
+
+class TestKillQuery(ImpalaTestSuite):
+ @classmethod
+ def add_test_dimensions(cls):
+ cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
+
+ def test_same_coordinator(self, vector):
+ protocol = vector.get_value('protocol')
+ with self.create_client_for_nth_impalad(0, protocol) as client, \
+ QueryToKill(self, protocol) as query_id_to_kill:
+ assert_kill_ok(client, query_id_to_kill)
+
+ def test_different_coordinator(self, vector):
+ protocol = vector.get_value('protocol')
+ with self.create_client_for_nth_impalad(1, protocol) as client, \
+ QueryToKill(self, protocol, nth_impalad=0) as query_id_to_kill:
+ assert_kill_ok(client, query_id_to_kill)
+
+ def test_invalid_query_id(self, vector):
+ with self.create_impala_client(protocol=vector.get_value('protocol')) as
client:
+ assert_kill_error(
+ client,
+ "ParseException: Syntax error",
+ sql="KILL QUERY 123:456",
+ )
+ assert_kill_error(
+ client,
+ "AnalysisException: Invalid query id: ''",
+ sql="KILL QUERY ''",
+ )
+ assert_kill_error(
+ client,
+ "AnalysisException: Invalid query id: 'f:g'",
+ sql="KILL QUERY 'f:g'",
+ )
+ assert_kill_error(
+ client,
+ "AnalysisException: Invalid query id: '123'",
+ sql="KILL QUERY '123'",
+ )
+
+ def test_idempotence(self, vector):
+ protocol = vector.get_value('protocol')
+ with self.create_impala_client(protocol=protocol) as client, \
+ QueryToKill(self, protocol) as query_id_to_kill:
+ assert_kill_ok(client, query_id_to_kill)
+ assert_kill_error(
+ client,
+ "Could not find query on any coordinator",
+ query_id=query_id_to_kill,
+ )
+ assert_kill_error(
+ client,
+ "Could not find query on any coordinator",
+ query_id=query_id_to_kill,
+ )
diff --git a/tests/util/cancel_util.py b/tests/util/cancel_util.py
index d2f011824..5957ce757 100644
--- a/tests/util/cancel_util.py
+++ b/tests/util/cancel_util.py
@@ -19,18 +19,91 @@ from __future__ import absolute_import, division,
print_function
import threading
from time import sleep
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.impala_connection import create_connection
from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_result_verifier import error_msg_expected
+
+
+class QueryToKill:
+ def __init__(self, test_suite, protocol, check_on_exit=True, user=None,
nth_impalad=0):
+ self.client = test_suite.create_client_for_nth_impalad(nth_impalad,
protocol)
+ self.sql = 'SELECT sleep(1000)'
+ self.check_on_exit = check_on_exit
+ self.user = user
+
+ def poll(self):
+ while True:
+ try:
+ results = self.client.fetch(self.sql, self.handle)
+ if len(results.data) > 0:
+ raise Exception("Failed to kill query within time limit.")
+ except Exception as e:
+ self.exc = e
+ return
+
+ def __enter__(self):
+ self.handle = self.client.execute_async(self.sql, user=self.user)
+ self.poll_thread = threading.Thread(target=lambda: self.poll())
+ self.poll_thread.start()
+ return self.client.get_query_id(self.handle)
+
+ def __exit__(self, exc_type, exc_value, traceback): # noqa: U100
+ self.poll_thread.join()
+ if not self.check_on_exit:
+ self.client.close()
+ return
+ # If ImpalaServer::UnregisterQuery() happens before the last polling, the
error
+ # message will be "Invalid or unknown query handle". Otherwise, the error
message
+ # will be "Cancelled".
+ assert error_msg_expected(
+ str(self.exc),
+ "Invalid or unknown query handle",
+ self.client.get_query_id(self.handle),
+ ) or error_msg_expected(
+ str(self.exc),
+ "Cancelled",
+ self.client.get_query_id(self.handle),
+ )
+ try:
+ self.client.fetch(self.sql, self.handle)
+ except Exception as ex:
+ assert "Invalid or unknown query handle" in str(ex)
+ finally:
+ self.client.close()
+
+
+def assert_kill_ok(client, query_id, user=None):
+ sql = "KILL QUERY '{0}'".format(query_id)
+ result = client.execute(sql, user=user)
+ assert result.success and len(result.data) == 1
+ assert result.data[0] == "Query {0} is killed.".format(query_id)
+
+
+def assert_kill_error(client, error_msg, query_id=None, sql=None, user=None):
+ if sql is None:
+ sql = "KILL QUERY '{0}'".format(query_id)
+ try:
+ client.execute(sql, user=user)
+ assert False, "Failed to catch the exception."
+ except Exception as exc:
+ assert error_msg_expected(str(exc), error_msg)
def cancel_query_and_validate_state(client, query, exec_option, table_format,
- cancel_delay, join_before_close=False):
+ cancel_delay, join_before_close=False, use_kill_query_statement=False):
"""Runs the given query asynchronously and then cancels it after the
specified delay.
The query is run with the given 'exec_options' against the specified
'table_format'. A
separate async thread is launched to fetch the results of the query. The
method
validates that the query was successfully cancelled and that the error
messages for the
calls to ImpalaConnection#fetch and #close are consistent. If
'join_before_close' is
True the method will join against the fetch results thread before closing
the query.
+
+ If 'use_kill_query_statement' is True and 'join_before_close' is False, a
KILL QUERY
+ statement will be executed to cancel and close the query, instead of sending
the Thrift
+ RPCs directly.
"""
+ assert not (join_before_close and use_kill_query_statement)
+
if table_format: ImpalaTestSuite.change_database(client, table_format)
if exec_option: client.set_configuration(exec_option)
handle = client.execute_async(query)
@@ -51,18 +124,30 @@ def cancel_query_and_validate_state(client, query,
exec_option, table_format,
if "Query Status:" in line:
error_msg += line
assert False, error_msg
- cancel_result = client.cancel(handle)
- assert cancel_result.status_code == 0,\
- 'Unexpected status code from cancel request: %s' % cancel_result
+ if use_kill_query_statement:
+ with create_connection(
+ host_port=client.get_host_port(),
+ protocol=client.get_test_protocol(),
+ ) as kill_client:
+ kill_client.connect()
+ if exec_option:
+ kill_client.set_configuration(exec_option)
+ assert_kill_ok(kill_client, client.get_query_id(handle))
+ else:
+ cancel_result = client.cancel(handle)
+ assert cancel_result.status_code == 0, \
+ 'Unexpected status code from cancel request: %s' % cancel_result
if join_before_close:
thread.join()
close_error = None
- try:
- client.close_query(handle)
- except ImpalaBeeswaxException as e:
- close_error = e
+ # The KILL QUERY statement will also close the query.
+ if not use_kill_query_statement:
+ try:
+ client.close_query(handle)
+ except ImpalaBeeswaxException as e:
+ close_error = e
# Before accessing fetch_results_error we need to join the fetch thread
thread.join()