This is an automated email from the ASF dual-hosted git repository.
stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 6bfa86f IMPALA-6894: Use an internal representation of query states
in ClientRequestState
6bfa86f is described below
commit 6bfa86faa06ce053d9c88c2c291af5a3c0e119c3
Author: Sahil Takiar <[email protected]>
AuthorDate: Fri Nov 8 17:16:49 2019 -0800
IMPALA-6894: Use an internal representation of query states in
ClientRequestState
Re-factors the state machine of ClientRequestState so that it uses an
internal state represetation rather than using the one defined by
TOperationState. The possible states are defined in
ClientRequestState::ExecState and the possible state transitions are
outlined in client-request-state.h and enforced in
ClientRequestState::UpdateNonErrorExecState. The states defined in
ClientRequestState::ExecState are the same states currently used in
TOperationState. This patch simply makes it easy to define new states
in the future.
The value of ClientRequestState::ExecState is exposed to clients via the
entry "Impala Query State" in the runtime profile. It is meant to be the
Impala specific version of "Query State" (which is the Beeswax state).
This allows Impala to expose its internal state without breaking existing
clients that might rely on the value of "Query State".
Additional Bug Fixes:
* Previously, UpdateNonErrorOperationState would ignore attempts to make
illegal state transitions, now the method uses DCHECKs to ensure only
valid state transitions are attempted; this required fixing a possible race
condition where a query could transition from RUNNING to PENDING
* The ClientRequestState state is now tracked using an AtomicEnum, which
fixes a few possible race conditions where the state was being read
without holding the ClientRequestState::lock_
Testing:
* Ran core tests
* Added test to make sure "Impala Query State" is populated
Change-Id: I1ce70bd2e964b309ebfc9d6ff6d900485db4d630
Reviewed-on: http://gerrit.cloudera.org:8080/14744
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Sahil Takiar <[email protected]>
---
be/src/service/client-request-state.cc | 119 ++++++++++++++++++++++----------
be/src/service/client-request-state.h | 55 +++++++++------
be/src/service/impala-beeswax-server.cc | 4 +-
be/src/service/impala-hs2-server.cc | 6 +-
be/src/service/impala-http-handler.cc | 8 +--
be/src/service/impala-server.cc | 6 +-
tests/common/impala_test_suite.py | 13 ++--
tests/query_test/test_observability.py | 88 ++++++++++++++++-------
8 files changed, 200 insertions(+), 99 deletions(-)
diff --git a/be/src/service/client-request-state.cc
b/be/src/service/client-request-state.cc
index 8619060..023bddc 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -136,6 +136,8 @@ ClientRequestState::ClientRequestState(
summary_profile_->AddInfoString("End Time", "");
summary_profile_->AddInfoString("Query Type", "N/A");
summary_profile_->AddInfoString("Query State",
PrintThriftEnum(BeeswaxQueryState()));
+ summary_profile_->AddInfoString(
+ "Impala Query State", ExecStateToString(exec_state_.Load()));
summary_profile_->AddInfoString("Query Status", "OK");
summary_profile_->AddInfoString("Impala Version", GetVersionString(/*
compact */ true));
summary_profile_->AddInfoString("User", effective_user());
@@ -267,17 +269,16 @@ Status ClientRequestState::Exec() {
}
case TStmtType::ADMIN_FN:
DCHECK(exec_request_.admin_request.type == TAdminRequestType::SHUTDOWN);
- return ExecShutdownRequest();
+ RETURN_IF_ERROR(ExecShutdownRequest());
+ break;
default:
stringstream errmsg;
errmsg << "Unknown exec request stmt type: " << exec_request_.stmt_type;
return Status(errmsg.str());
}
- if (async_exec_thread_.get() != nullptr) {
- UpdateNonErrorOperationState(TOperationState::PENDING_STATE);
- } else {
- UpdateNonErrorOperationState(TOperationState::RUNNING_STATE);
+ if (async_exec_thread_.get() == nullptr) {
+ UpdateNonErrorExecState(ExecState::RUNNING);
}
return Status::OK();
}
@@ -499,6 +500,9 @@ Status ClientRequestState::ExecAsyncQueryOrDmlRequest(
// Don't start executing the query if Cancel() was called concurrently
with Exec().
if (is_cancelled_) return Status::CANCELLED;
}
+ // Don't transition to PENDING inside the FinishExecQueryOrDmlRequest thread
because
+ // the query should be in the PENDING state before the Exec RPC returns.
+ UpdateNonErrorExecState(ExecState::PENDING);
RETURN_IF_ERROR(Thread::Create("query-exec-state", "async-exec-thread",
&ClientRequestState::FinishExecQueryOrDmlRequest, this,
&async_exec_thread_, true));
return Status::OK();
@@ -559,7 +563,7 @@ void ClientRequestState::FinishExecQueryOrDmlRequest() {
}
profile_->AddChild(coord_->query_profile());
- UpdateNonErrorOperationState(TOperationState::RUNNING_STATE);
+ UpdateNonErrorExecState(ExecState::RUNNING);
}
Status ClientRequestState::ExecDdlRequest() {
@@ -774,6 +778,7 @@ Status ClientRequestState::Exec(const TMetadataOpRequest&
exec_request) {
&metadata_op_result));
result_metadata_ = metadata_op_result.schema;
request_result_set_.reset(new vector<TResultRow>(metadata_op_result.rows));
+ UpdateNonErrorExecState(ExecState::RUNNING);
return Status::OK();
}
@@ -825,11 +830,11 @@ void ClientRequestState::Wait() {
if (stmt_type() == TStmtType::DDL) {
DCHECK(catalog_op_type() != TCatalogOpType::DDL || request_result_set_
!= nullptr);
}
- UpdateNonErrorOperationState(TOperationState::FINISHED_STATE);
+ UpdateNonErrorExecState(ExecState::FINISHED);
}
- // UpdateQueryStatus() or UpdateNonErrorOperationState() have updated
operation_state_.
- DCHECK(operation_state_ == TOperationState::FINISHED_STATE ||
- operation_state_ == TOperationState::ERROR_STATE);
+ // UpdateQueryStatus() or UpdateNonErrorExecState() have updated exec_state_.
+ ExecState exec_state = exec_state_.Load();
+ DCHECK(exec_state == ExecState::FINISHED || exec_state == ExecState::ERROR);
// Notify all the threads blocked on Wait() to finish and then log the query
events,
// if any.
{
@@ -917,31 +922,47 @@ Status ClientRequestState::RestartFetch() {
return Status::OK();
}
-void ClientRequestState::UpdateNonErrorOperationState(TOperationState::type
new_state) {
+void ClientRequestState::UpdateNonErrorExecState(ExecState new_state) {
lock_guard<mutex> l(lock_);
+ ExecState old_state = exec_state_.Load();
+ static string error_msg = "Illegal state transition: $0 -> $1";
switch (new_state) {
- case TOperationState::PENDING_STATE:
- if (operation_state_ == TOperationState::INITIALIZED_STATE) {
- UpdateOperationState(new_state);
- }
+ case ExecState::PENDING:
+ DCHECK(old_state == ExecState::INITIALIZED) << Substitute(
+ error_msg, ExecStateToString(old_state),
ExecStateToString(new_state));
+ UpdateExecState(new_state);
break;
- case TOperationState::RUNNING_STATE:
- case TOperationState::FINISHED_STATE:
- if (operation_state_ == TOperationState::INITIALIZED_STATE
- || operation_state_ == TOperationState::PENDING_STATE
- || operation_state_ == TOperationState::RUNNING_STATE) {
- UpdateOperationState(new_state);
+ case ExecState::RUNNING:
+ // It is possible for FinishExecQueryOrDmlRequest() to attempt a
transition to
+ // running, even after the query has been cancelled with an error status
(and is
+ // thus in the ERROR ExecState). In this case, just ignore the
transition attempt.
+ if (old_state != ExecState::ERROR) {
+ // DDL statements and metadata ops don't use the PENDING state, so a
query can
+ // transition directly from the INITIALIZED to RUNNING state.
+ DCHECK(old_state == ExecState::INITIALIZED || old_state ==
ExecState::PENDING)
+ << Substitute(
+ error_msg, ExecStateToString(old_state),
ExecStateToString(new_state));
+ UpdateExecState(new_state);
}
break;
+ case ExecState::FINISHED:
+ // A query can transition from PENDING to FINISHED if it is cancelled by
the
+ // client.
+ DCHECK(old_state == ExecState::PENDING || old_state ==
ExecState::RUNNING)
+ << Substitute(
+ error_msg, ExecStateToString(old_state),
ExecStateToString(new_state));
+ UpdateExecState(new_state);
+ break;
default:
- DCHECK(false) << "A non-error state expected but got: " << new_state;
+ DCHECK(false) << "A non-error state expected but got: "
+ << ExecStateToString(new_state);
}
}
Status ClientRequestState::UpdateQueryStatus(const Status& status) {
// Preserve the first non-ok status
if (!status.ok() && query_status_.ok()) {
- UpdateOperationState(TOperationState::ERROR_STATE);
+ UpdateExecState(ExecState::ERROR);
query_status_ = status;
summary_profile_->AddInfoStringRedacted("Query Status",
query_status_.GetDetail());
}
@@ -951,9 +972,9 @@ Status ClientRequestState::UpdateQueryStatus(const Status&
status) {
Status ClientRequestState::FetchRowsInternal(const int32_t max_rows,
QueryResultSet* fetched_rows, int64_t block_on_wait_time_us) {
- // Wait() guarantees that we've transitioned at least to FINISHED_STATE (and
any
+ // Wait() guarantees that we've transitioned at least to FINISHED state (and
any
// state beyond that should have a non-OK query_status_ set).
- DCHECK(operation_state_ == TOperationState::FINISHED_STATE);
+ DCHECK(exec_state_.Load() == ExecState::FINISHED);
if (eos_) return Status::OK();
@@ -1089,12 +1110,12 @@ Status ClientRequestState::Cancel(bool check_inflight,
const Status* cause) {
{
lock_guard<mutex> lock(lock_);
// If the query has reached a terminal state, no need to update the state.
- bool already_done = eos_ || operation_state_ ==
TOperationState::ERROR_STATE;
+ bool already_done = eos_ || exec_state_.Load() == ExecState::ERROR;
if (!already_done && cause != NULL) {
DCHECK(!cause->ok());
discard_result(UpdateQueryStatus(*cause));
query_events_->MarkEvent("Cancelled");
- DCHECK_EQ(operation_state_, TOperationState::ERROR_STATE);
+ DCHECK(exec_state_.Load() == ExecState::ERROR);
}
admit_outcome_.Set(AdmissionOutcome::CANCELLED);
@@ -1335,21 +1356,36 @@ void ClientRequestState::ClearResultCache() {
result_cache_.reset();
}
-void ClientRequestState::UpdateOperationState(
- TOperationState::type operation_state) {
- operation_state_ = operation_state;
+void ClientRequestState::UpdateExecState(ExecState exec_state) {
+ exec_state_.Store(exec_state);
summary_profile_->AddInfoString("Query State",
PrintThriftEnum(BeeswaxQueryState()));
+ summary_profile_->AddInfoString("Impala Query State",
ExecStateToString(exec_state));
+}
+
+apache::hive::service::cli::thrift::TOperationState::type
+ClientRequestState::TOperationState() const {
+ switch (exec_state_.Load()) {
+ case ExecState::INITIALIZED: return TOperationState::INITIALIZED_STATE;
+ case ExecState::PENDING: return TOperationState::PENDING_STATE;
+ case ExecState::RUNNING: return TOperationState::RUNNING_STATE;
+ case ExecState::FINISHED: return TOperationState::FINISHED_STATE;
+ case ExecState::ERROR: return TOperationState::ERROR_STATE;
+ default: {
+ DCHECK(false) << "Add explicit translation for all used ExecState
values";
+ return TOperationState::ERROR_STATE;
+ }
+ }
}
beeswax::QueryState::type ClientRequestState::BeeswaxQueryState() const {
- switch (operation_state_) {
- case TOperationState::INITIALIZED_STATE: return
beeswax::QueryState::CREATED;
- case TOperationState::PENDING_STATE: return beeswax::QueryState::COMPILED;
- case TOperationState::RUNNING_STATE: return beeswax::QueryState::RUNNING;
- case TOperationState::FINISHED_STATE: return beeswax::QueryState::FINISHED;
- case TOperationState::ERROR_STATE: return beeswax::QueryState::EXCEPTION;
+ switch (exec_state_.Load()) {
+ case ExecState::INITIALIZED: return beeswax::QueryState::CREATED;
+ case ExecState::PENDING: return beeswax::QueryState::COMPILED;
+ case ExecState::RUNNING: return beeswax::QueryState::RUNNING;
+ case ExecState::FINISHED: return beeswax::QueryState::FINISHED;
+ case ExecState::ERROR: return beeswax::QueryState::EXCEPTION;
default: {
- DCHECK(false) << "Add explicit translation for all used TOperationState
values";
+ DCHECK(false) << "Add explicit translation for all used ExecState
values";
return beeswax::QueryState::EXCEPTION;
}
}
@@ -1616,4 +1652,13 @@ Status ClientRequestState::LogLineageRecord() {
return Status::OK();
}
+string ClientRequestState::ExecStateToString(ExecState state) const {
+ static const unordered_map<ClientRequestState::ExecState, const char*>
+ exec_state_strings{{ClientRequestState::ExecState::INITIALIZED,
"INITIALIZED"},
+ {ClientRequestState::ExecState::PENDING, "PENDING"},
+ {ClientRequestState::ExecState::RUNNING, "RUNNING"},
+ {ClientRequestState::ExecState::FINISHED, "FINISHED"},
+ {ClientRequestState::ExecState::ERROR, "ERROR"}};
+ return exec_state_strings.at(state);
+}
}
diff --git a/be/src/service/client-request-state.h
b/be/src/service/client-request-state.h
index 57f7a03..3584e67 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -58,6 +58,15 @@ enum class AdmissionOutcome;
/// synchronize access explicitly via lock(). See the ImpalaServer class
comment for
/// the required lock acquisition order.
///
+/// State Machine:
+/// ExecState represents all possible states and UpdateNonErrorExecState /
+/// UpdateQueryStatus defines all possible state transitions. ExecState starts
out in the
+/// INITIALIZED state and eventually transition to either the FINISHED or
ERROR state. Any
+/// state can transition to the ERROR state (if an error is hit or a timeout
occurs). The
+/// possible state transitions differ for a query depending on its type (e.g.
depending on
+/// the TStmtType). Successful QUERY / DML queries transition from INITIALIZED
to PENDING
+/// to RUNNING to FINISHED whereas DDL queries skip the PENDING phase.
+///
/// TODO: Compute stats is the only stmt that requires child queries. Once the
/// CatalogService performs background stats gathering the concept of child
queries
/// will likely become obsolete. Remove all child-query related code from this
class.
@@ -68,6 +77,10 @@ class ClientRequestState {
~ClientRequestState();
+ enum class ExecState {
+ INITIALIZED, PENDING, RUNNING, FINISHED, ERROR
+ };
+
/// Sets the profile that is produced by the frontend. The frontend creates
the
/// profile during planning and returns it to the backend via TExecRequest,
/// which then sets the frontend profile.
@@ -134,11 +147,10 @@ class ClientRequestState {
Status RestartFetch() WARN_UNUSED_RESULT;
/// Update operation state if the requested state isn't already obsolete.
This is
- /// only for non-error states (PENDING_STATE, RUNNING_STATE and
FINISHED_STATE) - if the
- /// query encounters an error the query status needs to be set with
information about
- /// the error so UpdateQueryStatus() must be used instead. Takes lock_.
- void UpdateNonErrorOperationState(
- apache::hive::service::cli::thrift::TOperationState::type
operation_state);
+ /// only for non-error states (PENDING, RUNNING and FINISHED) - if the query
encounters
+ /// an error the query status needs to be set with information about the
error so
+ /// UpdateQueryStatus() must be used instead. Takes lock_.
+ void UpdateNonErrorExecState(ExecState exec_state);
/// Update the query status and the "Query Status" summary profile string.
/// If current status is already != ok, no update is made (we preserve the
first error)
@@ -152,8 +164,8 @@ class ClientRequestState {
/// Cancels the child queries and the coordinator with the given cause.
/// If cause is NULL, it assume this was deliberately cancelled by the user
while in
- /// FINISHED operation state. Otherwise, sets state to ERROR_STATE (TODO:
IMPALA-1262:
- /// use CANCELED_STATE). Does nothing if the query has reached EOS or
already cancelled.
+ /// FINISHED state. Otherwise, sets state to ERROR (TODO: IMPALA-1262: use
CANCELLED).
+ /// Does nothing if the query has reached EOS or already cancelled.
///
/// Only returns an error if 'check_inflight' is true and the query is not
yet
/// in-flight. Otherwise, proceed and return Status::OK() even if the query
isn't
@@ -242,11 +254,11 @@ class ClientRequestState {
}
boost::mutex* lock() { return &lock_; }
boost::mutex* fetch_rows_lock() { return &fetch_rows_lock_; }
- apache::hive::service::cli::thrift::TOperationState::type operation_state()
const {
- return operation_state_;
- }
- // Translate operation_state() to a beeswax::QueryState. TODO: remove calls
to this
- // and replace with uses of operation_state() directly.
+ /// ExecState is stored using an AtomicEnum, so reads do not require holding
lock_.
+ ExecState exec_state() const { return exec_state_.Load(); }
+ /// Translate exec_state_ to a TOperationState.
+ apache::hive::service::cli::thrift::TOperationState::type TOperationState()
const;
+ /// Translate exec_state_ to a beeswax::QueryState.
beeswax::QueryState::type BeeswaxQueryState() const;
const Status& query_status() const { return query_status_; }
void set_result_metadata(const TResultSetMetadata& md) { result_metadata_ =
md; }
@@ -459,11 +471,10 @@ protected:
bool is_cancelled_ = false; // if true, Cancel() was called.
bool eos_ = false; // if true, there are no more rows to return
- /// We enforce the invariant that query_status_ is not OK iff
operation_state_ is
- /// ERROR_STATE, given that lock_ is held. operation_state_ should only be
updated
- /// using UpdateOperationState(), to ensure that the query profile is also
updated.
- apache::hive::service::cli::thrift::TOperationState::type operation_state_ =
- apache::hive::service::cli::thrift::TOperationState::INITIALIZED_STATE;
+ /// We enforce the invariant that query_status_ is not OK iff exec_state_ is
ERROR,
+ /// given that lock_ is held. exec_state_ should only be updated using
+ /// UpdateExecState(), to ensure that the query profile is also updated.
+ AtomicEnum<ExecState> exec_state_{ExecState::INITIALIZED};
/// The current status of the query tracked by this ClientRequestState.
Updated by
/// UpdateQueryStatus(Status).
@@ -594,10 +605,9 @@ protected:
void ClearResultCache();
/// Update the operation state and the "Query State" summary profile string.
- /// Does not take lock_, but requires it: caller must ensure lock_
- /// is taken before calling UpdateOperationState.
- void UpdateOperationState(
- apache::hive::service::cli::thrift::TOperationState::type
operation_state);
+ /// Does not take lock_, but requires it: caller must ensure lock_ is taken
before
+ /// calling UpdateExecState.
+ void UpdateExecState(ExecState exec_state);
/// Gets the query options, their levels and the values for this client
request
/// and populates the result set with them. It covers the subset of options
for
@@ -628,6 +638,9 @@ protected:
/// Logs audit and column lineage events. Expects that Wait() has already
finished.
/// Grabs lock_ for polling the query_status(). Hence do not call it under
lock_.
void LogQueryEvents();
+
+ /// Converts the given ExecState to a string representation.
+ std::string ExecStateToString(ExecState state) const;
};
}
diff --git a/be/src/service/impala-beeswax-server.cc
b/be/src/service/impala-beeswax-server.cc
index 4304ce3..3c90cc2 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -324,10 +324,10 @@ void ImpalaServer::get_log(string& log, const
LogContextId& context) {
stringstream error_log_ss;
{
- // Take the lock to ensure that if the client sees a query_state ==
EXCEPTION, it is
+ // Take the lock to ensure that if the client sees a exec_state == ERROR,
it is
// guaranteed to see the error query_status.
lock_guard<mutex> l(*request_state->lock());
- DCHECK_EQ(request_state->BeeswaxQueryState() ==
beeswax::QueryState::EXCEPTION,
+ DCHECK_EQ(request_state->exec_state() ==
ClientRequestState::ExecState::ERROR,
!request_state->query_status().ok());
// If the query status is !ok, include the status error message at the top
of the log.
if (!request_state->query_status().ok()) {
diff --git a/be/src/service/impala-hs2-server.cc
b/be/src/service/impala-hs2-server.cc
index d219eb4..f8e5dc0 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -162,7 +162,7 @@ void ImpalaServer::ExecuteMetadataOp(const
THandleIdentifier& session_handle,
return;
}
- request_state->UpdateNonErrorOperationState(TOperationState::FINISHED_STATE);
+
request_state->UpdateNonErrorExecState(ClientRequestState::ExecState::FINISHED);
Status inflight_status = SetQueryInflight(session, request_state);
if (!inflight_status.ok()) {
@@ -723,7 +723,7 @@ void
ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
{
lock_guard<mutex> l(*request_state->lock());
- TOperationState::type operation_state = request_state->operation_state();
+ TOperationState::type operation_state = request_state->TOperationState();
return_val.__set_operationState(operation_state);
if (operation_state == TOperationState::ERROR_STATE) {
DCHECK(!request_state->query_status().ok());
@@ -938,7 +938,7 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const
TGetLogReq& request) {
// guaranteed to see the error query_status.
lock_guard<mutex> l(*request_state->lock());
Status query_status = request_state->query_status();
- DCHECK_EQ(request_state->operation_state() == TOperationState::ERROR_STATE,
+ DCHECK_EQ(request_state->exec_state() ==
ClientRequestState::ExecState::ERROR,
!query_status.ok());
// If the query status is !ok, include the status error message at the top
of the log.
if (!query_status.ok()) ss << query_status.GetDetail();
diff --git a/be/src/service/impala-http-handler.cc
b/be/src/service/impala-http-handler.cc
index 5ce4280..c3bac80 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -847,7 +847,7 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool
include_json_plan, bool include
// state lock to be acquired, since it could potentially be an expensive
// call, if the table Catalog metadata loading is in progress. Instead
// update the caller that the plan information is unavailable.
- if (request_state->operation_state() ==
TOperationState::INITIALIZED_STATE) {
+ if (request_state->exec_state() ==
ClientRequestState::ExecState::INITIALIZED) {
document->AddMember(
"plan_metadata_unavailable", "true", document->GetAllocator());
return;
@@ -996,9 +996,9 @@ void ImpalaHttpHandler::AdmissionStateHandler(
server_->client_request_state_map_.DoFuncForAllEntries([&running_queries](
const std::shared_ptr<ClientRequestState>& request_state) {
// Make sure only queries past admission control are added.
- auto query_state = request_state->operation_state();
- if (query_state != TOperationState::INITIALIZED_STATE
- && query_state != TOperationState::PENDING_STATE
+ auto query_state = request_state->exec_state();
+ if (query_state != ClientRequestState::ExecState::INITIALIZED
+ && query_state != ClientRequestState::ExecState::PENDING
&& request_state->schedule() != nullptr)
running_queries[request_state->request_pool()].push_back(
{request_state->query_id(),
request_state->schedule()->per_backend_mem_limit(),
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 76219be..11ea058 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -621,8 +621,8 @@ Status ImpalaServer::GetRuntimeProfileOutput(const
TUniqueId& query_id,
{
shared_ptr<ClientRequestState> request_state =
GetClientRequestState(query_id);
if (request_state.get() != nullptr) {
- // For queries in INITIALIZED_STATE, the profile information isn't
populated yet.
- if (request_state->operation_state() ==
TOperationState::INITIALIZED_STATE) {
+ // For queries in INITIALIZED state, the profile information isn't
populated yet.
+ if (request_state->exec_state() ==
ClientRequestState::ExecState::INITIALIZED) {
return Status::Expected("Query plan is not ready.");
}
lock_guard<mutex> l(*request_state->lock());
@@ -691,7 +691,7 @@ Status ImpalaServer::GetExecSummary(const TUniqueId&
query_id, const string& use
lock_guard<mutex> l(*request_state->lock());
RETURN_IF_ERROR(CheckProfileAccess(user, request_state->effective_user(),
request_state->user_has_profile_access()));
- if (request_state->operation_state() == TOperationState::PENDING_STATE) {
+ if (request_state->exec_state() ==
ClientRequestState::ExecState::PENDING) {
const string* admission_result =
request_state->summary_profile()->GetInfoString(
AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT);
if (admission_result != nullptr) {
diff --git a/tests/common/impala_test_suite.py
b/tests/common/impala_test_suite.py
index 79c3be5..8f5066e 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -242,7 +242,6 @@ class ImpalaTestSuite(BaseTestSuite):
return client
@classmethod
-
def get_impalad_cluster_size(cls):
return len(cls.__get_cluster_host_ports('beeswax'))
@@ -1125,19 +1124,23 @@ class ImpalaTestSuite(BaseTestSuite):
raise Exception("Table {0}.{1} didn't show up after {2}s", db_name,
table_name,
timeout_s)
- def assert_eventually(self, timeout_s, period_s, condition):
+ def assert_eventually(self, timeout_s, period_s, condition, error_msg=None):
"""Assert that the condition (a function with no parameters) returns True
within the
given timeout. The condition is executed every period_s seconds. The check
assumes
that once the condition returns True, it continues to return True. Throws
a Timeout
- if the condition does not return true within timeout_s seconds."""
+ if the condition does not return true within timeout_s seconds.
'error_msg' is an
+ optional function that must return a string. If set, the result of the
function will
+ be included in the Timeout error message."""
count = 0
start_time = time.time()
while not condition() and time.time() - start_time < timeout_s:
time.sleep(period_s)
count += 1
if not condition():
- raise Timeout("Check failed to return True after {0} tries and {1}
seconds".format(
- count, timeout_s))
+ error_msg_str = " error message: " + error_msg() if error_msg else ""
+ raise Timeout(
+ "Check failed to return True after {0} tries and {1}
seconds{2}".format(
+ count, timeout_s, error_msg_str))
def assert_impalad_log_contains(self, level, line_regex, expected_count=1):
"""
diff --git a/tests/query_test/test_observability.py
b/tests/query_test/test_observability.py
index 9087852..101f086 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -17,17 +17,17 @@
from collections import defaultdict
from datetime import datetime
-from tests.common.impala_cluster import ImpalaCluster
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import (SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon,
SkipIfLocal, SkipIfNotHdfsMinicluster)
from tests.util.filesystem_utils import IS_EC
-from time import sleep, time
+from time import sleep
from RuntimeProfile.ttypes import TRuntimeProfileFormat
-import logging
import pytest
import re
+
class TestObservability(ImpalaTestSuite):
@classmethod
def get_workload(self):
@@ -158,27 +158,6 @@ class TestObservability(ImpalaTestSuite):
assert result.exec_summary[0]['peak_mem'] >= 0
assert result.exec_summary[0]['est_peak_mem'] >= 0
- def test_query_states(self):
- """Tests that the query profile shows expected query states."""
- query = "select count(*) from functional.alltypes"
- handle = self.execute_query_async(query,
- {"debug_action": "CRS_BEFORE_ADMISSION:SLEEP@1000"})
- # If ExecuteStatement() has completed and the query is paused in the
admission control
- # phase, then the query must be in COMPILED state.
- profile = self.client.get_runtime_profile(handle)
- assert "Query State: COMPILED" in profile
- # After completion of the admission control phase, the query must have at
least
- # reached RUNNING state.
- self.client.wait_for_admission_control(handle)
- profile = self.client.get_runtime_profile(handle)
- assert "Query State: RUNNING" in profile or \
- "Query State: FINISHED" in profile, profile
-
- results = self.client.fetch(query, handle)
- profile = self.client.get_runtime_profile(handle)
- # After fetching the results, the query must be in state FINISHED.
- assert "Query State: FINISHED" in profile, profile
-
def test_query_options(self):
"""Test that the query profile shows expected non-default query options,
both set
explicitly through client and those set by planner"""
@@ -747,3 +726,64 @@ class TestObservability(ImpalaTestSuite):
result = self.execute_query(query)
assert result.success
self.__verify_hashtable_stats_profile(result.runtime_profile)
+
+
+class TestQueryStates(ImpalaTestSuite):
+ """Test that the 'Query State' and 'Impala Query State' are set correctly in
the
+ runtime profile."""
+
+ @classmethod
+ def get_workload(self):
+ return 'functional-query'
+
+ def test_query_states(self):
+ """Tests that the query profile shows expected query states."""
+ query = "select count(*) from functional.alltypes where bool_col =
sleep(10)"
+ handle = self.execute_query_async(query,
+ {"debug_action": "CRS_BEFORE_ADMISSION:SLEEP@1000"})
+ # If ExecuteStatement() has completed and the query is paused in the
admission control
+ # phase, then the query must be in COMPILED state.
+ profile = self.client.get_runtime_profile(handle)
+ assert self.__is_line_in_profile("Query State: COMPILED", profile)
+ assert self.__is_line_in_profile("Impala Query State: PENDING", profile)
+ # After completion of the admission control phase, the query must have at
least
+ # reached RUNNING state.
+ self.client.wait_for_admission_control(handle)
+ profile = self.client.get_runtime_profile(handle)
+ assert self.__is_line_in_profile("Query State: RUNNING", profile), profile
+ assert self.__is_line_in_profile("Impala Query State: RUNNING", profile),
profile
+
+ self.client.fetch(query, handle)
+ profile = self.client.get_runtime_profile(handle)
+ # After fetching the results, the query must be in state FINISHED.
+ assert self.__is_line_in_profile("Query State: FINISHED", profile), profile
+ assert self.__is_line_in_profile("Impala Query State: FINISHED", profile),
profile
+
+ def test_error_query_state(self):
+ """Tests that the query profile shows the proper error state."""
+ query = "select * from functional.alltypes limit 10"
+ handle = self.execute_query_async(query, {"abort_on_error": "1",
+ "debug_action":
"0:GETNEXT:FAIL"})
+
+ def assert_finished():
+ profile = self.client.get_runtime_profile(handle)
+ return self.__is_line_in_profile("Query State: FINISHED", profile) and \
+ self.__is_line_in_profile("Impala Query State: FINISHED", profile)
+
+ self.assert_eventually(30, 1, assert_finished,
+ lambda: self.client.get_runtime_profile(handle))
+
+ try:
+ self.client.fetch(query, handle)
+ assert False
+ except ImpalaBeeswaxException:
+ pass
+
+ profile = self.client.get_runtime_profile(handle)
+ assert self.__is_line_in_profile("Query State: EXCEPTION", profile),
profile
+ assert self.__is_line_in_profile("Impala Query State: ERROR", profile),
profile
+
+ def __is_line_in_profile(self, line, profile):
+ """Returns true if the given 'line' is in the given 'profile'. A single
line of the
+ profile must exactly match the given 'line' (excluding whitespaces)."""
+ return re.search("^\s*{0}\s*$".format(line), profile, re.M)