IMPALA-5340: Query profile displays stale query state Previously, updates to the query state in ClientRequestState were not immediately reflected in the query profile, potentially leading to the profile showing an incorrect state for an extended perioud during execution.
In particular, queries were being shown in the 'CREATED' state long after they had started 'RUNNING'. The fix is to update the profile whenever the state is updated. Testing: - Extended existing hs2 tests and added a beeswax test to check for expected query states in the profile Change-Id: I952319b7308a24d4e2dff924199c0c771bce25b3 Reviewed-on: http://gerrit.cloudera.org:8080/6923 Reviewed-by: Dan Hecht <[email protected]> Reviewed-by: Thomas Tauber-Marshall <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ee9fbeca Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ee9fbeca Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ee9fbeca Branch: refs/heads/master Commit: ee9fbeca9041349d3532d315bf07e77d784a846d Parents: 3610533 Author: Thomas Tauber-Marshall <[email protected]> Authored: Thu May 18 12:50:48 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Sat May 20 03:17:59 2017 +0000 ---------------------------------------------------------------------- be/src/service/client-request-state.cc | 18 +++++++----- be/src/service/client-request-state.h | 10 +++++-- tests/hs2/test_hs2.py | 45 ++++++++++++++++++++++++++--- tests/query_test/test_observability.py | 15 ++++++++++ 4 files changed, 75 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee9fbeca/be/src/service/client-request-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index 26bb3f4..42e8de3 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -148,7 +148,6 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) { profile_.AddChild(&server_profile_); summary_profile_.AddInfoString("Query Type", PrintTStmtType(stmt_type())); - summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_)); summary_profile_.AddInfoString("Query Options (non default)", DebugQueryOptions(query_ctx_.client_request.query_options)); @@ -571,7 +570,6 @@ void ClientRequestState::Done() { unique_lock<mutex> l(lock_); end_time_ = TimestampValue::LocalTime(); summary_profile_.AddInfoString("End Time", end_time().ToString()); - summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_)); query_events_->MarkEvent("Unregister query"); // Update result set cache metrics, and update mem limit accounting before tearing @@ -595,7 +593,6 @@ Status ClientRequestState::Exec(const TMetadataOpRequest& exec_request) { TResultSet metadata_op_result; // Like the other Exec(), fill out as much profile information as we're able to. summary_profile_.AddInfoString("Query Type", PrintTStmtType(TStmtType::DDL)); - summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_)); RETURN_IF_ERROR(frontend_->ExecHiveServer2MetadataOp(exec_request, &metadata_op_result)); result_metadata_ = metadata_op_result.schema; @@ -723,13 +720,13 @@ void ClientRequestState::UpdateNonErrorQueryState( beeswax::QueryState::type query_state) { lock_guard<mutex> l(lock_); DCHECK(query_state != beeswax::QueryState::EXCEPTION); - if (query_state_ < query_state) query_state_ = query_state; + if (query_state_ < query_state) UpdateQueryState(query_state); } Status ClientRequestState::UpdateQueryStatus(const Status& status) { // Preserve the first non-ok status if (!status.ok() && query_status_.ok()) { - query_state_ = beeswax::QueryState::EXCEPTION; + UpdateQueryState(beeswax::QueryState::EXCEPTION); query_status_ = status; summary_profile_.AddInfoString("Query Status", query_status_.GetDetail()); } @@ -744,7 +741,7 @@ Status ClientRequestState::FetchRowsInternal(const int32_t max_rows, if (eos_) return Status::OK(); if (request_result_set_ != NULL) { - query_state_ = beeswax::QueryState::FINISHED; + UpdateQueryState(beeswax::QueryState::FINISHED); int num_rows = 0; const vector<TResultRow>& all_rows = (*(request_result_set_.get())); // max_rows <= 0 means no limit @@ -772,7 +769,8 @@ Status ClientRequestState::FetchRowsInternal(const int32_t max_rows, if (num_rows_fetched_from_cache >= max_rows) return Status::OK(); } - query_state_ = beeswax::QueryState::FINISHED; // results will be ready after this call + // results will be ready after this call + UpdateQueryState(beeswax::QueryState::FINISHED); // Maximum number of rows to be fetched from the coord. int32_t max_coord_rows = max_rows; @@ -1082,4 +1080,10 @@ void ClientRequestState::ClearResultCache() { } result_cache_.reset(NULL); } + +void ClientRequestState::UpdateQueryState( + beeswax::QueryState::type query_state) { + query_state_ = query_state; + summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_)); +} } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee9fbeca/be/src/service/client-request-state.h ---------------------------------------------------------------------- diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h index 0e18957..5990b88 100644 --- a/be/src/service/client-request-state.h +++ b/be/src/service/client-request-state.h @@ -309,8 +309,9 @@ class ClientRequestState { bool is_cancelled_; // if true, Cancel() was called. bool eos_; // if true, there are no more rows to return - // We enforce the invariant that query_status_ is not OK iff query_state_ - // is EXCEPTION, given that lock_ is held. + /// We enforce the invariant that query_status_ is not OK iff query_state_ is EXCEPTION, + /// given that lock_ is held. query_state_ should only be updated using + /// UpdateQueryState(), to ensure that the query profile is also updated. beeswax::QueryState::type query_state_; Status query_status_; TExecRequest exec_request_; @@ -407,6 +408,11 @@ class ClientRequestState { /// Sets result_cache_ to NULL and updates its associated metrics and mem consumption. /// This function is a no-op if the cache has already been cleared. void ClearResultCache(); + + /// Update the query state and the "Query State" summary profile string. + /// Does not take lock_, but requires it: caller must ensure lock_ + /// is taken before calling UpdateQueryState. + void UpdateQueryState(beeswax::QueryState::type query_state); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee9fbeca/tests/hs2/test_hs2.py ---------------------------------------------------------------------- diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py index c183001..42c3fb7 100644 --- a/tests/hs2/test_hs2.py +++ b/tests/hs2/test_hs2.py @@ -181,12 +181,34 @@ class TestHS2(HS2TestSuite): get_operation_status_resp = \ self.get_operation_status(execute_statement_resp.operationHandle) TestHS2.check_response(get_operation_status_resp) - + # If ExecuteStatement() has completed but the results haven't been fetched yet, the + # query must have at least reached RUNNING. assert get_operation_status_resp.operationState in \ - [TCLIService.TOperationState.INITIALIZED_STATE, - TCLIService.TOperationState.RUNNING_STATE, + [TCLIService.TOperationState.RUNNING_STATE, TCLIService.TOperationState.FINISHED_STATE] + fetch_results_req = TCLIService.TFetchResultsReq() + fetch_results_req.operationHandle = execute_statement_resp.operationHandle + fetch_results_req.maxRows = 100 + fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req) + + get_operation_status_resp = \ + self.get_operation_status(execute_statement_resp.operationHandle) + TestHS2.check_response(get_operation_status_resp) + # After fetching the results, the query must be in state FINISHED. + assert get_operation_status_resp.operationState == \ + TCLIService.TOperationState.FINISHED_STATE + + close_operation_req = TCLIService.TCloseOperationReq() + close_operation_req.operationHandle = execute_statement_resp.operationHandle + TestHS2.check_response(self.hs2_client.CloseOperation(close_operation_req)) + + get_operation_status_resp = \ + self.get_operation_status(execute_statement_resp.operationHandle) + # GetOperationState should return 'Invalid query handle' if the query has been closed. + TestHS2.check_response(get_operation_status_resp, \ + TCLIService.TStatusCode.ERROR_STATUS) + @needs_session(conf_overlay={"abort_on_error": "1"}) def test_get_operation_status_error(self): """Tests that GetOperationStatus returns a valid result for a query that encountered @@ -396,6 +418,21 @@ class TestHS2(HS2TestSuite): get_profile_resp = self.hs2_client.GetRuntimeProfile(get_profile_req) TestHS2.check_response(get_profile_resp) assert execute_statement_req.statement in get_profile_resp.profile + # If ExecuteStatement() has completed but the results haven't been fetched yet, the + # query must have at least reached RUNNING. + assert "Query State: RUNNING" in get_profile_resp.profile or \ + "Query State: FINISHED" in get_profile_resp.profile, get_profile_resp.profile + + fetch_results_req = TCLIService.TFetchResultsReq() + fetch_results_req.operationHandle = execute_statement_resp.operationHandle + fetch_results_req.maxRows = 100 + fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req) + + get_profile_resp = self.hs2_client.GetRuntimeProfile(get_profile_req) + TestHS2.check_response(get_profile_resp) + assert execute_statement_req.statement in get_profile_resp.profile + # After fetching the results, we must be in state FINISHED. + assert "Query State: FINISHED" in get_profile_resp.profile, get_profile_resp.profile close_operation_req = TCLIService.TCloseOperationReq() close_operation_req.operationHandle = execute_statement_resp.operationHandle @@ -403,8 +440,8 @@ class TestHS2(HS2TestSuite): get_profile_resp = self.hs2_client.GetRuntimeProfile(get_profile_req) TestHS2.check_response(get_profile_resp) - assert execute_statement_req.statement in get_profile_resp.profile + assert "Query State: FINISHED" in get_profile_resp.profile, get_profile_resp.profile @needs_session(conf_overlay={"use:database": "functional"}) def test_change_default_database(self): http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee9fbeca/tests/query_test/test_observability.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py index 1b5cf25..0f82bd5 100644 --- a/tests/query_test/test_observability.py +++ b/tests/query_test/test_observability.py @@ -77,3 +77,18 @@ class TestObservability(ImpalaTestSuite): scan_idx = len(result.exec_summary) - 1 assert result.exec_summary[scan_idx]['operator'] == '00:SCAN HBASE' assert result.exec_summary[scan_idx]['detail'] == 'functional_hbase.alltypestiny' + + def test_get_profile(self): + """Tests that the query profile shows expected query states.""" + query = "select count(*) from functional.alltypes" + handle = self.execute_query_async(query, dict()) + profile = self.client.get_runtime_profile(handle) + # If ExecuteStatement() has completed but the results haven't been fetched yet, the + # query must have at least reached RUNNING. + assert "Query State: RUNNING" in profile or \ + "Query State: FINISHED" in profile, profile + + results = self.client.fetch(query, handle) + profile = self.client.get_runtime_profile(handle) + # After fetching the results, the query must be in state FINISHED. + assert "Query State: FINISHED" in profile, profile
