This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new d8a8412c2 IMPALA-13294: Add support for long polling to avoid client 
side wait
d8a8412c2 is described below

commit d8a8412c2b750937a3577b08d81ffd9a16269b83
Author: Joe McDonnell <[email protected]>
AuthorDate: Fri Nov 4 12:48:32 2022 -0700

    IMPALA-13294: Add support for long polling to avoid client side wait
    
    Currently, Impala does an execute call, then the client polls
    waiting for the operation to finish (or error out). The client
    sleeps between polls, and this sleep time can be a substantial
    percentage of a short query's execution time.
    
    To reduce this client side sleep, this implements long polling to
    provide an option to wait for query completion on the server side.
    This is controlled by the long_polling_time_ms query option. If
    set to greater than zero, status RPCs will wait for query
    completion for up to that amount of time. This defaults to off (0ms).
    
    Both Beeswax and HS2 add a wait for query completion in their
    get status calls (get_state for Beeswax, GetOperationStatus for HS2).
    This doesn't wait in the execute RPC calls (e.g. query for Beeswax,
    ExecuteStatement for HS2), because neither includes the query status
    in the response. The client will always need to do a separate status
    RPC.
    
    This modifies impala-shell and the beeswax client to avoid doing a
    sleep if the get_state/GetOperationStatus calls take longer than
    they would have slept. In other words, if they would have slept 50ms,
    then they skip that sleep if the RPC to the server took longer than
    50ms. This allows the client to maintain its sleep behavior with
    older Impalas that don't use long polling while adapting properly
    to systems that do have long polling. This has the added benefit
    that it also adjusts for high latency to the server as well. This
    does not change any of the sleep times.
    
    Testing:
     - This adds a test case in test_hs2.py to verify the long
       polling behavior
    
    Change-Id: I72ca595c5dd8a33b936f078f7f7faa5b3f0f337d
    Reviewed-on: http://gerrit.cloudera.org:8080/19205
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/service/client-request-state.cc  | 25 +++++++--
 be/src/service/client-request-state.h   | 12 +++++
 be/src/service/impala-beeswax-server.cc |  5 ++
 be/src/service/impala-hs2-server.cc     |  4 ++
 be/src/service/impala-server.h          |  1 +
 be/src/service/query-options.cc         |  7 +++
 be/src/service/query-options.h          |  3 +-
 common/thrift/ImpalaService.thrift      |  6 +++
 common/thrift/Query.thrift              |  3 ++
 shell/impala_client.py                  |  6 ++-
 tests/beeswax/impala_beeswax.py         | 19 ++++++-
 tests/hs2/test_hs2.py                   | 95 +++++++++++++++++++++++++++++++++
 12 files changed, 179 insertions(+), 7 deletions(-)

diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index cffdebd60..a4a84ca7b 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1841,9 +1841,28 @@ void ClientRequestState::ClearResultCache() {
 }
 
 void ClientRequestState::UpdateExecState(ExecState exec_state) {
-  exec_state_.Store(exec_state);
-  summary_profile_->AddInfoString("Query State", 
PrintValue(BeeswaxQueryState()));
-  summary_profile_->AddInfoString("Impala Query State", 
ExecStateToString(exec_state));
+  {
+    lock_guard<mutex> l(exec_state_lock_);
+    exec_state_.Store(exec_state);
+    summary_profile_->AddInfoString("Query State", 
PrintValue(BeeswaxQueryState()));
+    summary_profile_->AddInfoString("Impala Query State", 
ExecStateToString(exec_state));
+  }
+  // Drop exec_state_lock_ before signalling
+  exec_state_cv_.NotifyAll();
+}
+
+void ClientRequestState::WaitForCompletionExecState() {
+  if (query_options().long_polling_time_ms <= 0) return;
+  int64_t timeout_us = query_options().long_polling_time_ms * MICROS_PER_MILLI;
+  unique_lock<mutex> l(exec_state_lock_);
+  timespec deadline;
+  TimeFromNowMicros(timeout_us, &deadline);
+  bool timed_out = false;
+  while (exec_state() != ExecState::FINISHED &&
+         exec_state() != ExecState::ERROR &&
+         !timed_out) {
+    timed_out = !exec_state_cv_.WaitUntil(l, deadline);
+  }
 }
 
 TOperationState::type ClientRequestState::TOperationState() const {
diff --git a/be/src/service/client-request-state.h 
b/be/src/service/client-request-state.h
index 8457728ae..ee03c9353 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -309,6 +309,11 @@ class ClientRequestState {
   /// ExecState is stored using an AtomicEnum, so reads do not require holding 
lock_.
   ExecState exec_state() const { return exec_state_.Load(); }
 
+  /// WaitForCompletionExecState waits until the state reaches FINISHED or 
ERROR
+  /// or it reaches the timeout. The timeout is specified using the 
long_polling_time_ms
+  /// query option.
+  void WaitForCompletionExecState();
+
   /// RetryState is stored using an AtomicEnum, so reads do not require 
holding lock_.
   RetryState retry_state() const { return retry_state_.Load(); }
 
@@ -570,6 +575,13 @@ class ClientRequestState {
   /// Callers are expected to use BlockOnWait() for Wait() to finish.
   ConditionVariable block_on_wait_cv_;
 
+  /// Lock used to synchronize access for exec_state_cv_. Some callers hold 
lock_ while
+  /// acquiring this lock, so if lock_ is needed it must be acquired before 
this lock.
+  std::mutex exec_state_lock_;
+
+  /// Condition variable used to signal threads that are watching the 
exec_state_.
+  ConditionVariable exec_state_cv_;
+
   /// Wait condition used in conjunction with block_on_wait_cv_.
   bool is_wait_done_ = false;
 
diff --git a/be/src/service/impala-beeswax-server.cc 
b/be/src/service/impala-beeswax-server.cc
index 8d08a4d5f..499b3f13d 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -324,6 +324,11 @@ beeswax::QueryState::type ImpalaServer::get_state(
   // Validate that query can be accessed by user.
   RAISE_IF_ERROR(CheckClientRequestSession(session.get(), 
query_handle->effective_user(),
       query_id), SQLSTATE_GENERAL_ERROR);
+
+  // When using long polling, this waits up to long_polling_time_ms 
milliseconds for
+  // query completion.polling
+  query_handle->WaitForCompletionExecState();
+
   // Take the lock to ensure that if the client sees a query_state == 
EXCEPTION, it is
   // guaranteed to see the error query_status.
   lock_guard<mutex> l(*query_handle->lock());
diff --git a/be/src/service/impala-hs2-server.cc 
b/be/src/service/impala-hs2-server.cc
index 67229c709..ab3cce89d 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -904,6 +904,10 @@ void 
ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
           session_id, SecretArg::Operation(op_secret, query_id), &session),
       SQLSTATE_GENERAL_ERROR);
 
+  // When using long polling, this waits up to long_polling_time_ms 
milliseconds for
+  // query completion.polling
+  query_handle->WaitForCompletionExecState();
+
   {
     lock_guard<mutex> l(*query_handle->lock());
     TOperationState::type operation_state = query_handle->TOperationState();
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 8533fd9b3..1d936dc4b 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -191,6 +191,7 @@ class TQueryExecRequest;
 /// 6. ClientRequestState::lock
 /// 7. ClientRequestState::expiration_data_lock_
 /// 8. Coordinator::exec_summary_lock
+/// 9. ClientRequestState::exec_state_lock_
 ///
 /// The following locks are not held in conjunction with other locks:
 /// * query_log_lock_
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 7160f3a89..938efaba9 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1305,6 +1305,13 @@ Status impala::SetQueryOption(TImpalaQueryOptions::type 
option, const string& va
         query_options->__set_write_kudu_utc_timestamps(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::LONG_POLLING_TIME_MS: {
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>(
+            option, value, &int32_t_val));
+        query_options->__set_long_polling_time_ms(int32_t_val);
+        break;
+      }
       default:
         string key = to_string(option);
         if (IsRemovedQueryOption(key)) {
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 25b645f28..473f2ac3e 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -52,7 +52,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE                                                       
          \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),                       
          \
-      TImpalaQueryOptions::DISABLE_OPTIMIZED_JSON_COUNT_STAR + 1);             
          \
+      TImpalaQueryOptions::LONG_POLLING_TIME_MS + 1);                          
          \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, 
ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)     
          \
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)   
          \
@@ -337,6 +337,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
       WRITE_KUDU_UTC_TIMESTAMPS, TQueryOptionLevel::ADVANCED)                  
          \
   QUERY_OPT_FN(disable_optimized_json_count_star, 
DISABLE_OPTIMIZED_JSON_COUNT_STAR,     \
       TQueryOptionLevel::ADVANCED)                                             
          \
+  QUERY_OPT_FN(long_polling_time_ms, LONG_POLLING_TIME_MS, 
TQueryOptionLevel::REGULAR)   \
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query 
state.
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index b25223789..a3e4c1c66 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -956,6 +956,12 @@ enum TImpalaQueryOptions {
 
   // Turns off optimized JSON count star (zero slots) scan, falls back to 
rapidjson parse.
   DISABLE_OPTIMIZED_JSON_COUNT_STAR = 181
+
+  // How long to wait for statement completion for 
ExecuteStatement/executeAndWait and
+  // GetOperationStatus/get_state RPCs. Waiting on the server side allows for 
immediate
+  // notification when the query completes and avoid added latency from 
waiting on the
+  // client side. This defaults to off (0ms).
+  LONG_POLLING_TIME_MS = 182
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 80bf4a90a..6fb804106 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -741,6 +741,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   182: optional bool disable_optimized_json_count_star = false;
+
+  // See comment in ImpalaService.thrift
+  183: optional i32 long_polling_time_ms = 0;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and 
external
diff --git a/shell/impala_client.py b/shell/impala_client.py
index 8ec1d72ab..962d8c7ad 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -269,7 +269,9 @@ class ImpalaClient(object):
     If 'periodic_callback' is provided, it is called periodically with no 
arguments."""
     loop_start = time.time()
     while True:
+      start_rpc_time = time.time()
       query_state = self.get_query_state(last_query_handle)
+      rpc_time = time.time() - start_rpc_time
       if query_state == self.FINISHED_STATE:
         break
       elif query_state in (self.ERROR_STATE, self.CANCELED_STATE):
@@ -280,7 +282,9 @@ class ImpalaClient(object):
           raise DisconnectedException("Not connected to impalad.")
 
       if periodic_callback is not None: periodic_callback()
-      time.sleep(self._get_sleep_interval(loop_start))
+      sleep_time = self._get_sleep_interval(loop_start)
+      if rpc_time < sleep_time:
+        time.sleep(sleep_time - rpc_time)
 
   def get_query_state(self, last_query_handle):
     """Return the query state string for 'last_query_handle'. Returns 
self.ERROR_STATE
diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py
index 226fe38d2..528eaeec2 100644
--- a/tests/beeswax/impala_beeswax.py
+++ b/tests/beeswax/impala_beeswax.py
@@ -389,11 +389,20 @@ class ImpalaBeeswaxClient(object):
   def close_query(self, handle):
     self.__do_rpc(lambda: self.imp_service.close(handle))
 
+  def _get_sleep_interval(self, start_time):
+    """Returns the time to sleep in seconds before polling again. This uses a 
fixed
+       50 millisecond sleep that doesn't vary by elapsed time. This is only 
used for
+       testing, and there is no reason to sleep longer for test 
environments."""
+    return 0.05
+
   def wait_for_finished(self, query_handle):
     """Given a query handle, polls the coordinator waiting for the query to 
transition to
        'FINISHED' state"""
+    loop_start = time.time()
     while True:
+      start_rpc_time = time.time()
       query_state = self.get_state(query_handle)
+      rpc_time = time.time() - start_rpc_time
       # if the rpc succeeded, the output is the query state
       if query_state == self.query_states["FINISHED"]:
         break
@@ -404,14 +413,18 @@ class ImpalaBeeswaxClient(object):
           raise ImpalaBeeswaxException(error_log, None)
         finally:
           self.close_query(query_handle)
-      time.sleep(0.05)
+      sleep_time = self._get_sleep_interval(loop_start)
+      if rpc_time < sleep_time:
+        time.sleep(sleep_time - rpc_time)
 
   def wait_for_finished_timeout(self, query_handle, timeout=10):
     """Given a query handle and a timeout, polls the coordinator waiting for 
the query to
        transition to 'FINISHED' state till 'timeout' seconds"""
     start_time = time.time()
     while (time.time() - start_time < timeout):
+      start_rpc_time = time.time()
       query_state = self.get_state(query_handle)
+      rpc_time = time.time() - start_rpc_time
       # if the rpc succeeded, the output is the query state
       if query_state == self.query_states["FINISHED"]:
         return True
@@ -422,7 +435,9 @@ class ImpalaBeeswaxClient(object):
           raise ImpalaBeeswaxException(error_log, None)
         finally:
           self.close_query(query_handle)
-      time.sleep(0.05)
+      sleep_time = self._get_sleep_interval(start_time)
+      if rpc_time < sleep_time:
+        time.sleep(sleep_time - rpc_time)
     return False
 
   def wait_for_admission_control(self, query_handle):
diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py
index 02847442d..7b965389b 100644
--- a/tests/hs2/test_hs2.py
+++ b/tests/hs2/test_hs2.py
@@ -364,6 +364,101 @@ class TestHS2(HS2TestSuite):
         get_operation_status_resp.errorMessage is not ""
     assert get_operation_status_resp.sqlState == SQLSTATE_GENERAL_ERROR
 
+  @needs_session(conf_overlay={"long_polling_time_ms": "10000"})
+  def test_long_polling_success(self):
+    """Tests that GetOperationStatus waits for the query to complete and is 
interrupted
+    by the completion."""
+
+    # alltypestiny has 8 rows, so this statement is about 80ms of sleeps plus 
some
+    # regular execution.
+    statement = "SELECT count(sleep(10)) from functional.alltypestiny"
+    execute_statement_resp = self.execute_statement(statement)
+
+    start_time = time.time()
+    get_operation_status_resp = \
+        self.get_operation_status(execute_statement_resp.operationHandle)
+    end_time = time.time()
+    TestHS2.check_response(get_operation_status_resp)
+    # With long polling, get_operation_status only exits if it reaches a 
completion
+    # state within the interval. This is a short query that must complete 
before the
+    # long_polling_wait_time_ms of 10 seconds. This must have reached the 
FINISHED
+    # state.
+    assert get_operation_status_resp.operationState == \
+        TCLIService.TOperationState.FINISHED_STATE
+    # The long polling wait must have been interrupted by the completion, so 
it should
+    # not come anywhere close to waiting the full 10 seconds. 1 second is not 
a very
+    # tight time bound.
+    time_diff = end_time - start_time
+    assert time_diff < 1
+    # This should take at least 80ms, because that is the amount of time the 
query
+    # should sleep
+    assert time_diff >= 0.08
+
+    # Fetch the results so the query completes successfully
+    fetch_results_req = TCLIService.TFetchResultsReq()
+    fetch_results_req.operationHandle = execute_statement_resp.operationHandle
+    fetch_results_req.maxRows = 100
+    fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req)
+    TestHS2.check_response(fetch_results_resp)
+
+  @needs_session(conf_overlay={"long_polling_time_ms": "50"})
+  def test_long_polling_full_sleep(self):
+    """Tests that GetOperationStatus breaks out of its sleep at the proper 
time."""
+
+    # alltypestiny has 8 rows, so this statement is about 800ms of sleeps plus 
some
+    # regular execution.
+    statement = "SELECT count(sleep(100)) from functional.alltypestiny"
+    execute_statement_resp = self.execute_statement(statement)
+
+    while True:
+      start_time = time.time()
+      get_operation_status_resp = \
+          self.get_operation_status(execute_statement_resp.operationHandle)
+      end_time = time.time()
+      TestHS2.check_response(get_operation_status_resp)
+      # Each call into get_operation_status should wait at most 50ms. Verify 
that the
+      # sleeps are never longer than 100ms.
+      assert end_time - start_time < 0.1
+      if get_operation_status_resp.operationState == \
+         TCLIService.TOperationState.FINISHED_STATE:
+        break
+      # If this did not reach the finished state, then it should have waited 
at least
+      # 50ms.
+      if get_operation_status_resp.operationState != \
+         TCLIService.TOperationState.FINISHED_STATE:
+        assert end_time - start_time >= 0.050
+
+    # Fetch the results so the query completes successfully
+    fetch_results_req = TCLIService.TFetchResultsReq()
+    fetch_results_req.operationHandle = execute_statement_resp.operationHandle
+    fetch_results_req.maxRows = 100
+    fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req)
+    TestHS2.check_response(fetch_results_resp)
+
+  @needs_session(conf_overlay={"abort_on_error": "1", "long_polling_time_ms": 
"10000"})
+  def test_long_polling_error(self):
+    """Tests that GetOperationStatus waits for the query to hit an error and is
+    interrupted by the error."""
+
+    # This is the same as test_get_operation_status_error above with long 
polling.
+    # With long polling, get_operation_status waits for completion, but it 
will exit
+    # immediately when the statement hits an error. Because it waits, we know 
that
+    # exactly one get_operation_status call is enough (no need for a loop).
+    statement = "SELECT * FROM functional.alltypeserror"
+    execute_statement_resp = self.execute_statement(statement)
+    start_time = time.time()
+    get_operation_status_resp = \
+        self.get_operation_status(execute_statement_resp.operationHandle)
+    end_time = time.time()
+    TestHS2.check_response(get_operation_status_resp)
+    assert get_operation_status_resp.operationState == \
+        TCLIService.TOperationState.ERROR_STATE
+
+    # The long polling wait must have been interrupted by the error, so it 
should
+    # not come anywhere close to waiting the full 10 seconds. This is a very 
short
+    # statement, so it should hit the error within 1 second.
+    assert end_time - start_time < 1.0
+
   @needs_session()
   def test_malformed_get_operation_status(self):
     """Tests that a short guid / secret returns an error (regression would be 
to crash

Reply via email to