Repository: incubator-impala
Updated Branches:
  refs/heads/master 585ee48dc -> 523130108


IMPALA-1633: GetOperationStatus should set errorMessage and sqlState

Currently, we never populate the errorMessage or sqlState
fields of TGetOperationStatusResp when the GetOperationStatus
HiveServer2 rpc is called. This patch checks if the query has
an error status and if so sets errorMessage and sqlState.

GetOperationStatus also now takes the QueryExecState lock since
QueryExecState::query_state_ and QueryExecState::query_status_
are supposed to be protected by it.

Additionally, this patch performs some cleanup and adds some
documentation around our behavior for updating
QueryExecState::query_state_/query_status_.

This also addresses IMPALA-3298: TGetOperationStatusResp missing
error message when data is expired

Change-Id: Icb792f88286779fcf2ce409828de818bc4e80bed
Reviewed-on: http://gerrit.cloudera.org:8080/3094
Reviewed-by: Thomas Tauber-Marshall <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/52313010
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/52313010
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/52313010

Branch: refs/heads/master
Commit: 5231301084c76bd48059620798579873d935079f
Parents: 585ee48
Author: Thomas Tauber-Marshall <[email protected]>
Authored: Fri May 13 17:07:20 2016 -0700
Committer: Tim Armstrong <[email protected]>
Committed: Wed Jun 1 19:32:39 2016 -0700

----------------------------------------------------------------------
 be/src/service/impala-beeswax-server.cc |  6 ++---
 be/src/service/impala-hs2-server.cc     | 17 ++++++++++----
 be/src/service/query-exec-state.cc      |  8 ++++---
 be/src/service/query-exec-state.h       |  9 +++++---
 tests/hs2/hs2_test_suite.py             | 25 +++++++++++++++++++++
 tests/hs2/test_hs2.py                   | 33 +++++++++++++++++++---------
 6 files changed, 75 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/52313010/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc 
b/be/src/service/impala-beeswax-server.cc
index 35cb945..f26ace6 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -185,7 +185,7 @@ void ImpalaServer::query(QueryHandle& query_handle, const 
Query& query) {
   RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state),
       SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
 
-  exec_state->UpdateQueryState(QueryState::RUNNING);
+  exec_state->UpdateNonErrorQueryState(QueryState::RUNNING);
   // start thread to wait for results to become available, which will allow
   // us to advance query state to FINISHED or EXCEPTION
   exec_state->WaitAsync();
@@ -226,7 +226,7 @@ void ImpalaServer::executeAndWait(QueryHandle& 
query_handle, const Query& query,
   RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state),
       SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
 
-  exec_state->UpdateQueryState(QueryState::RUNNING);
+  exec_state->UpdateNonErrorQueryState(QueryState::RUNNING);
   // Once the query is running do a final check for session closure and add it 
to the
   // set of in-flight queries.
   Status status = SetQueryInflight(session, exec_state);
@@ -242,7 +242,7 @@ void ImpalaServer::executeAndWait(QueryHandle& 
query_handle, const Query& query,
     RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
   }
 
-  exec_state->UpdateQueryState(QueryState::FINISHED);
+  exec_state->UpdateNonErrorQueryState(QueryState::FINISHED);
   TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle);
 
   // If the input log context id is an empty string, then create a new number 
and

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/52313010/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc 
b/be/src/service/impala-hs2-server.cc
index ce5df38..9bb9a56 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -462,7 +462,7 @@ void ImpalaServer::ExecuteMetadataOp(const 
THandleIdentifier& session_handle,
     return;
   }
 
-  exec_state->UpdateQueryState(QueryState::FINISHED);
+  exec_state->UpdateNonErrorQueryState(QueryState::FINISHED);
 
   Status inflight_status = SetQueryInflight(session, exec_state);
   if (!inflight_status.ok()) {
@@ -767,7 +767,7 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& 
return_val,
       HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
     }
   }
-  exec_state->UpdateQueryState(QueryState::RUNNING);
+  exec_state->UpdateNonErrorQueryState(QueryState::RUNNING);
   // Start thread to wait for results to become available.
   exec_state->WaitAsync();
   // Once the query is running do a final check for session closure and add it 
to the
@@ -937,9 +937,18 @@ void 
ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
   lock_guard<mutex> l(query_exec_state_map_lock_);
   QueryExecStateMap::iterator entry = query_exec_state_map_.find(query_id);
   if (entry != query_exec_state_map_.end()) {
-    QueryState::type query_state = entry->second->query_state();
-    TOperationState::type operation_state = 
QueryStateToTOperationState(query_state);
+    QueryExecState *exec_state = entry->second.get();
+    lock_guard<mutex> l(*exec_state->lock());
+    TOperationState::type operation_state = QueryStateToTOperationState(
+        exec_state->query_state());
     return_val.__set_operationState(operation_state);
+    if (operation_state == TOperationState::ERROR_STATE) {
+      DCHECK(!exec_state->query_status().ok());
+      return_val.__set_errorMessage(exec_state->query_status().GetDetail());
+      return_val.__set_sqlState(SQLSTATE_GENERAL_ERROR);
+    } else {
+      DCHECK(exec_state->query_status().ok());
+    }
     return;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/52313010/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc 
b/be/src/service/query-exec-state.cc
index ec70c47..94a3589 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -605,7 +605,7 @@ void ImpalaServer::QueryExecState::Wait() {
     UpdateQueryStatus(status);
   }
   if (status.ok()) {
-    UpdateQueryState(QueryState::FINISHED);
+    UpdateNonErrorQueryState(QueryState::FINISHED);
   }
 }
 
@@ -674,8 +674,9 @@ Status ImpalaServer::QueryExecState::RestartFetch() {
   return Status::OK();
 }
 
-void ImpalaServer::QueryExecState::UpdateQueryState(QueryState::type 
query_state) {
+void ImpalaServer::QueryExecState::UpdateNonErrorQueryState(QueryState::type 
query_state) {
   lock_guard<mutex> l(lock_);
+  DCHECK(query_state != QueryState::EXCEPTION);
   if (query_state_ < query_state) query_state_ = query_state;
 }
 
@@ -845,9 +846,10 @@ void ImpalaServer::QueryExecState::Cancel(const Status* 
cause) {
   if (eos_ || query_state_ == QueryState::EXCEPTION) return;
 
   if (cause != NULL) {
+    DCHECK(!cause->ok());
     UpdateQueryStatus(*cause);
     query_events_->MarkEvent("Cancelled");
-    query_state_ = QueryState::EXCEPTION;
+    DCHECK_EQ(query_state_, QueryState::EXCEPTION);
   }
   if (coord_.get() != NULL) coord_->Cancel(cause);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/52313010/be/src/service/query-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.h 
b/be/src/service/query-exec-state.h
index 842ae7b..0b62a28 100644
--- a/be/src/service/query-exec-state.h
+++ b/be/src/service/query-exec-state.h
@@ -102,9 +102,11 @@ class ImpalaServer::QueryExecState {
   /// The caller must hold fetch_rows_lock_ and lock_.
   Status RestartFetch();
 
-  /// Update query state if the requested state isn't already obsolete.
+  /// Update query state if the requested state isn't already obsolete. This 
is only for
+  /// non-error states - if the query encounters an error the query status 
needs to be set
+  /// with information about the error so UpdateQueryStatus must be used 
instead.
   /// Takes lock_.
-  void UpdateQueryState(beeswax::QueryState::type query_state);
+  void UpdateNonErrorQueryState(beeswax::QueryState::type query_state);
 
   /// Update the query status and the "Query Status" summary profile string.
   /// If current status is already != ok, no update is made (we preserve the 
first error)
@@ -163,7 +165,6 @@ class ImpalaServer::QueryExecState {
   boost::mutex* lock() { return &lock_; }
   boost::mutex* fetch_rows_lock() { return &fetch_rows_lock_; }
   const beeswax::QueryState::type query_state() const { return query_state_; }
-  void set_query_state(beeswax::QueryState::type state) { query_state_ = 
state; }
   const Status& query_status() const { return query_status_; }
   void set_result_metadata(const TResultSetMetadata& md) { result_metadata_ = 
md; }
   const RuntimeProfile& profile() const { return profile_; }
@@ -281,6 +282,8 @@ class ImpalaServer::QueryExecState {
   RuntimeProfile::EventSequence* query_events_;
   std::vector<ExprContext*> output_expr_ctxs_;
   bool eos_;  // if true, there are no more rows to return
+  // We enforce the invariant that query_status_ is not OK iff query_state_
+  // is EXCEPTION, given that lock_ is held.
   beeswax::QueryState::type query_state_;
   Status query_status_;
   TExecRequest exec_request_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/52313010/tests/hs2/hs2_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/hs2/hs2_test_suite.py b/tests/hs2/hs2_test_suite.py
index e8bb199..4f3c50c 100644
--- a/tests/hs2/hs2_test_suite.py
+++ b/tests/hs2/hs2_test_suite.py
@@ -21,6 +21,7 @@ from thrift.transport.TSocket import TSocket
 from thrift.transport.TTransport import TBufferedTransport
 from thrift.protocol import TBinaryProtocol
 from tests.common.impala_test_suite import ImpalaTestSuite, 
IMPALAD_HS2_HOST_PORT
+from time import sleep, time
 
 def needs_session(protocol_version=
                   TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6,
@@ -200,3 +201,27 @@ class HS2TestSuite(ImpalaTestSuite):
             break
       formatted += (", ".join(row) + "\n")
     return (num_rows, formatted)
+
+  def get_operation_status(self, operation_handle):
+    """Executes GetOperationStatus with the given operation handle and returns 
the
+    TGetOperationStatusResp"""
+    get_operation_status_req = TCLIService.TGetOperationStatusReq()
+    get_operation_status_req.operationHandle = operation_handle
+    get_operation_status_resp = \
+        self.hs2_client.GetOperationStatus(get_operation_status_req)
+    return get_operation_status_resp
+
+  def wait_for_operation_state(self, operation_handle, expected_state, \
+                               timeout = 10, interval = 1):
+    """Waits for the operation to reach expected_state by polling 
GetOperationStatus every
+    interval seconds, returning the TGetOperationStatusResp, or raising an 
assertion after
+    timeout seconds."""
+    start_time = time()
+    while (time() - start_time < timeout):
+      get_operation_status_resp = self.get_operation_status(operation_handle)
+      HS2TestSuite.check_response(get_operation_status_resp)
+      if get_operation_status_resp.operationState is expected_state:
+        return get_operation_status_resp
+      sleep(interval)
+    assert False, 'Did not reach expected operation state %s in time, actual 
state was ' \
+        '%s' % (expected_state, get_operation_status_resp.operationState)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/52313010/tests/hs2/test_hs2.py
----------------------------------------------------------------------
diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py
index a8c4928..90be391 100644
--- a/tests/hs2/test_hs2.py
+++ b/tests/hs2/test_hs2.py
@@ -24,6 +24,7 @@ from ImpalaService import ImpalaHiveServer2Service
 from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session, 
operation_id_to_query_id
 from TCLIService import TCLIService
 
+SQLSTATE_GENERAL_ERROR = "HY000"
 
 class TestHS2(HS2TestSuite):
   def test_open_session(self):
@@ -164,11 +165,8 @@ class TestHS2(HS2TestSuite):
     execute_statement_resp = 
self.hs2_client.ExecuteStatement(execute_statement_req)
     TestHS2.check_response(execute_statement_resp)
 
-    get_operation_status_req = TCLIService.TGetOperationStatusReq()
-    get_operation_status_req.operationHandle = 
execute_statement_resp.operationHandle
-
     get_operation_status_resp = \
-        self.hs2_client.GetOperationStatus(get_operation_status_req)
+        self.get_operation_status(execute_statement_resp.operationHandle)
     TestHS2.check_response(get_operation_status_resp)
 
     assert get_operation_status_resp.operationState in \
@@ -176,6 +174,24 @@ class TestHS2(HS2TestSuite):
          TCLIService.TOperationState.RUNNING_STATE,
          TCLIService.TOperationState.FINISHED_STATE]
 
+  @needs_session(conf_overlay={"abort_on_error": "1"})
+  def test_get_operation_status_error(self):
+    """Tests that GetOperationStatus returns a valid result for a query that 
encountered
+    an error"""
+    execute_statement_req = TCLIService.TExecuteStatementReq()
+    execute_statement_req.sessionHandle = self.session_handle
+    execute_statement_req.statement = "SELECT * FROM functional.alltypeserror"
+    execute_statement_resp = 
self.hs2_client.ExecuteStatement(execute_statement_req)
+    TestHS2.check_response(execute_statement_resp)
+
+    get_operation_status_resp = self.wait_for_operation_state( \
+        execute_statement_resp.operationHandle, 
TCLIService.TOperationState.ERROR_STATE)
+
+    # Check that an error message and sql state have been set.
+    assert get_operation_status_resp.errorMessage is not None and \
+        get_operation_status_resp.errorMessage is not ""
+    assert get_operation_status_resp.sqlState == SQLSTATE_GENERAL_ERROR
+
   @needs_session()
   def test_malformed_get_operation_status(self):
     """Tests that a short guid / secret returns an error (regression would be 
to crash
@@ -189,13 +205,10 @@ class TestHS2(HS2TestSuite):
     operation_handle.operationType = 
TCLIService.TOperationType.EXECUTE_STATEMENT
     operation_handle.hasResultSet = False
 
-    get_operation_status_req = TCLIService.TGetOperationStatusReq()
-    get_operation_status_req.operationHandle = operation_handle
+    get_operation_status_resp = self.get_operation_status(operation_handle)
+    TestHS2.check_response(get_operation_status_resp, \
+        TCLIService.TStatusCode.ERROR_STATUS)
 
-    get_operation_status_resp = \
-        self.hs2_client.GetOperationStatus(get_operation_status_req)
-    TestHS2.check_response(get_operation_status_resp,
-                           TCLIService.TStatusCode.ERROR_STATUS)
     err_msg = "(guid size: %d, expected 16, secret size: %d, expected 16)" \
         % (len(operation_handle.operationId.guid),
            len(operation_handle.operationId.secret))

Reply via email to