Repository: incubator-impala Updated Branches: refs/heads/master 585ee48dc -> 523130108
IMPALA-1633: GetOperationStatus should set errorMessage and sqlState Currently, we never populate the errorMessage or sqlState fields of TGetOperationStatusResp when the GetOperationStatus HiveServer2 rpc is called. This patch checks if the query has an error status and if so sets errorMessage and sqlState. GetOperationStatus also now takes the QueryExecState lock since QueryExecState::query_state_ and QueryExecState::query_status_ are supposed to be protected by it. Additionally, this patch performs some cleanup and adds some documentation around our behavior for updating QueryExecState::query_state_/query_status_. This also addresses IMPALA-3298: TGetOperationStatusResp missing error message when data is expired Change-Id: Icb792f88286779fcf2ce409828de818bc4e80bed Reviewed-on: http://gerrit.cloudera.org:8080/3094 Reviewed-by: Thomas Tauber-Marshall <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/52313010 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/52313010 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/52313010 Branch: refs/heads/master Commit: 5231301084c76bd48059620798579873d935079f Parents: 585ee48 Author: Thomas Tauber-Marshall <[email protected]> Authored: Fri May 13 17:07:20 2016 -0700 Committer: Tim Armstrong <[email protected]> Committed: Wed Jun 1 19:32:39 2016 -0700 ---------------------------------------------------------------------- be/src/service/impala-beeswax-server.cc | 6 ++--- be/src/service/impala-hs2-server.cc | 17 ++++++++++---- be/src/service/query-exec-state.cc | 8 ++++--- be/src/service/query-exec-state.h | 9 +++++--- tests/hs2/hs2_test_suite.py | 25 +++++++++++++++++++++ tests/hs2/test_hs2.py | 33 +++++++++++++++++++--------- 6 files changed, 75 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/52313010/be/src/service/impala-beeswax-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc index 35cb945..f26ace6 100644 --- a/be/src/service/impala-beeswax-server.cc +++ b/be/src/service/impala-beeswax-server.cc @@ -185,7 +185,7 @@ void ImpalaServer::query(QueryHandle& query_handle, const Query& query) { RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state), SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION); - exec_state->UpdateQueryState(QueryState::RUNNING); + exec_state->UpdateNonErrorQueryState(QueryState::RUNNING); // start thread to wait for results to become available, which will allow // us to advance query state to FINISHED or EXCEPTION exec_state->WaitAsync(); @@ -226,7 +226,7 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query, RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state), SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION); - exec_state->UpdateQueryState(QueryState::RUNNING); + exec_state->UpdateNonErrorQueryState(QueryState::RUNNING); // Once the query is running do a final check for session closure and add it to the // set of in-flight queries. Status status = SetQueryInflight(session, exec_state); @@ -242,7 +242,7 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query, RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR); } - exec_state->UpdateQueryState(QueryState::FINISHED); + exec_state->UpdateNonErrorQueryState(QueryState::FINISHED); TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle); // If the input log context id is an empty string, then create a new number and http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/52313010/be/src/service/impala-hs2-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index ce5df38..9bb9a56 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -462,7 +462,7 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, return; } - exec_state->UpdateQueryState(QueryState::FINISHED); + exec_state->UpdateNonErrorQueryState(QueryState::FINISHED); Status inflight_status = SetQueryInflight(session, exec_state); if (!inflight_status.ok()) { @@ -767,7 +767,7 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val, HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR); } } - exec_state->UpdateQueryState(QueryState::RUNNING); + exec_state->UpdateNonErrorQueryState(QueryState::RUNNING); // Start thread to wait for results to become available. exec_state->WaitAsync(); // Once the query is running do a final check for session closure and add it to the @@ -937,9 +937,18 @@ void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val, lock_guard<mutex> l(query_exec_state_map_lock_); QueryExecStateMap::iterator entry = query_exec_state_map_.find(query_id); if (entry != query_exec_state_map_.end()) { - QueryState::type query_state = entry->second->query_state(); - TOperationState::type operation_state = QueryStateToTOperationState(query_state); + QueryExecState *exec_state = entry->second.get(); + lock_guard<mutex> l(*exec_state->lock()); + TOperationState::type operation_state = QueryStateToTOperationState( + exec_state->query_state()); return_val.__set_operationState(operation_state); + if (operation_state == TOperationState::ERROR_STATE) { + DCHECK(!exec_state->query_status().ok()); + return_val.__set_errorMessage(exec_state->query_status().GetDetail()); + return_val.__set_sqlState(SQLSTATE_GENERAL_ERROR); + } else { + DCHECK(exec_state->query_status().ok()); + } return; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/52313010/be/src/service/query-exec-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc index ec70c47..94a3589 100644 --- a/be/src/service/query-exec-state.cc +++ b/be/src/service/query-exec-state.cc @@ -605,7 +605,7 @@ void ImpalaServer::QueryExecState::Wait() { UpdateQueryStatus(status); } if (status.ok()) { - UpdateQueryState(QueryState::FINISHED); + UpdateNonErrorQueryState(QueryState::FINISHED); } } @@ -674,8 +674,9 @@ Status ImpalaServer::QueryExecState::RestartFetch() { return Status::OK(); } -void ImpalaServer::QueryExecState::UpdateQueryState(QueryState::type query_state) { +void ImpalaServer::QueryExecState::UpdateNonErrorQueryState(QueryState::type query_state) { lock_guard<mutex> l(lock_); + DCHECK(query_state != QueryState::EXCEPTION); if (query_state_ < query_state) query_state_ = query_state; } @@ -845,9 +846,10 @@ void ImpalaServer::QueryExecState::Cancel(const Status* cause) { if (eos_ || query_state_ == QueryState::EXCEPTION) return; if (cause != NULL) { + DCHECK(!cause->ok()); UpdateQueryStatus(*cause); query_events_->MarkEvent("Cancelled"); - query_state_ = QueryState::EXCEPTION; + DCHECK_EQ(query_state_, QueryState::EXCEPTION); } if (coord_.get() != NULL) coord_->Cancel(cause); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/52313010/be/src/service/query-exec-state.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-exec-state.h b/be/src/service/query-exec-state.h index 842ae7b..0b62a28 100644 --- a/be/src/service/query-exec-state.h +++ b/be/src/service/query-exec-state.h @@ -102,9 +102,11 @@ class ImpalaServer::QueryExecState { /// The caller must hold fetch_rows_lock_ and lock_. Status RestartFetch(); - /// Update query state if the requested state isn't already obsolete. + /// Update query state if the requested state isn't already obsolete. This is only for + /// non-error states - if the query encounters an error the query status needs to be set + /// with information about the error so UpdateQueryStatus must be used instead. /// Takes lock_. - void UpdateQueryState(beeswax::QueryState::type query_state); + void UpdateNonErrorQueryState(beeswax::QueryState::type query_state); /// Update the query status and the "Query Status" summary profile string. /// If current status is already != ok, no update is made (we preserve the first error) @@ -163,7 +165,6 @@ class ImpalaServer::QueryExecState { boost::mutex* lock() { return &lock_; } boost::mutex* fetch_rows_lock() { return &fetch_rows_lock_; } const beeswax::QueryState::type query_state() const { return query_state_; } - void set_query_state(beeswax::QueryState::type state) { query_state_ = state; } const Status& query_status() const { return query_status_; } void set_result_metadata(const TResultSetMetadata& md) { result_metadata_ = md; } const RuntimeProfile& profile() const { return profile_; } @@ -281,6 +282,8 @@ class ImpalaServer::QueryExecState { RuntimeProfile::EventSequence* query_events_; std::vector<ExprContext*> output_expr_ctxs_; bool eos_; // if true, there are no more rows to return + // We enforce the invariant that query_status_ is not OK iff query_state_ + // is EXCEPTION, given that lock_ is held. beeswax::QueryState::type query_state_; Status query_status_; TExecRequest exec_request_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/52313010/tests/hs2/hs2_test_suite.py ---------------------------------------------------------------------- diff --git a/tests/hs2/hs2_test_suite.py b/tests/hs2/hs2_test_suite.py index e8bb199..4f3c50c 100644 --- a/tests/hs2/hs2_test_suite.py +++ b/tests/hs2/hs2_test_suite.py @@ -21,6 +21,7 @@ from thrift.transport.TSocket import TSocket from thrift.transport.TTransport import TBufferedTransport from thrift.protocol import TBinaryProtocol from tests.common.impala_test_suite import ImpalaTestSuite, IMPALAD_HS2_HOST_PORT +from time import sleep, time def needs_session(protocol_version= TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6, @@ -200,3 +201,27 @@ class HS2TestSuite(ImpalaTestSuite): break formatted += (", ".join(row) + "\n") return (num_rows, formatted) + + def get_operation_status(self, operation_handle): + """Executes GetOperationStatus with the given operation handle and returns the + TGetOperationStatusResp""" + get_operation_status_req = TCLIService.TGetOperationStatusReq() + get_operation_status_req.operationHandle = operation_handle + get_operation_status_resp = \ + self.hs2_client.GetOperationStatus(get_operation_status_req) + return get_operation_status_resp + + def wait_for_operation_state(self, operation_handle, expected_state, \ + timeout = 10, interval = 1): + """Waits for the operation to reach expected_state by polling GetOperationStatus every + interval seconds, returning the TGetOperationStatusResp, or raising an assertion after + timeout seconds.""" + start_time = time() + while (time() - start_time < timeout): + get_operation_status_resp = self.get_operation_status(operation_handle) + HS2TestSuite.check_response(get_operation_status_resp) + if get_operation_status_resp.operationState is expected_state: + return get_operation_status_resp + sleep(interval) + assert False, 'Did not reach expected operation state %s in time, actual state was ' \ + '%s' % (expected_state, get_operation_status_resp.operationState) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/52313010/tests/hs2/test_hs2.py ---------------------------------------------------------------------- diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py index a8c4928..90be391 100644 --- a/tests/hs2/test_hs2.py +++ b/tests/hs2/test_hs2.py @@ -24,6 +24,7 @@ from ImpalaService import ImpalaHiveServer2Service from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session, operation_id_to_query_id from TCLIService import TCLIService +SQLSTATE_GENERAL_ERROR = "HY000" class TestHS2(HS2TestSuite): def test_open_session(self): @@ -164,11 +165,8 @@ class TestHS2(HS2TestSuite): execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req) TestHS2.check_response(execute_statement_resp) - get_operation_status_req = TCLIService.TGetOperationStatusReq() - get_operation_status_req.operationHandle = execute_statement_resp.operationHandle - get_operation_status_resp = \ - self.hs2_client.GetOperationStatus(get_operation_status_req) + self.get_operation_status(execute_statement_resp.operationHandle) TestHS2.check_response(get_operation_status_resp) assert get_operation_status_resp.operationState in \ @@ -176,6 +174,24 @@ class TestHS2(HS2TestSuite): TCLIService.TOperationState.RUNNING_STATE, TCLIService.TOperationState.FINISHED_STATE] + @needs_session(conf_overlay={"abort_on_error": "1"}) + def test_get_operation_status_error(self): + """Tests that GetOperationStatus returns a valid result for a query that encountered + an error""" + execute_statement_req = TCLIService.TExecuteStatementReq() + execute_statement_req.sessionHandle = self.session_handle + execute_statement_req.statement = "SELECT * FROM functional.alltypeserror" + execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req) + TestHS2.check_response(execute_statement_resp) + + get_operation_status_resp = self.wait_for_operation_state( \ + execute_statement_resp.operationHandle, TCLIService.TOperationState.ERROR_STATE) + + # Check that an error message and sql state have been set. + assert get_operation_status_resp.errorMessage is not None and \ + get_operation_status_resp.errorMessage is not "" + assert get_operation_status_resp.sqlState == SQLSTATE_GENERAL_ERROR + @needs_session() def test_malformed_get_operation_status(self): """Tests that a short guid / secret returns an error (regression would be to crash @@ -189,13 +205,10 @@ class TestHS2(HS2TestSuite): operation_handle.operationType = TCLIService.TOperationType.EXECUTE_STATEMENT operation_handle.hasResultSet = False - get_operation_status_req = TCLIService.TGetOperationStatusReq() - get_operation_status_req.operationHandle = operation_handle + get_operation_status_resp = self.get_operation_status(operation_handle) + TestHS2.check_response(get_operation_status_resp, \ + TCLIService.TStatusCode.ERROR_STATUS) - get_operation_status_resp = \ - self.hs2_client.GetOperationStatus(get_operation_status_req) - TestHS2.check_response(get_operation_status_resp, - TCLIService.TStatusCode.ERROR_STATUS) err_msg = "(guid size: %d, expected 16, secret size: %d, expected 16)" \ % (len(operation_handle.operationId.guid), len(operation_handle.operationId.secret))
