This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository

The following commit(s) were added to refs/heads/master by this push:
     new 86e611b  IMPALA-9756: avoid coord cancellation/unregistration races
86e611b is described below

commit 86e611bdb2a912e283ce1db01bf2940ee1583ff3
Author: Tim Armstrong <>
AuthorDate: Mon May 18 15:46:34 2020 -0700

    IMPALA-9756: avoid coord cancellation/unregistration races
    This patch aims to make the query tear-down path more predictable.
    Specifically, we want to ensure that Coordinator::ComputeQuerySummary()
    finishes before the query is unregistered. Before that could happen
    if cancellation occurred on a different thread from unregistration,
    e.g. if async_exec_thread_ cancels the query, if an asynchronous
    Cancel() RPC comes in, if it was cancelled internally because of
    a timeout, or for some other reason. The race was possible for
    a couple of reasons:
    * The synchronization between threads happened on UpdateExecState(),
      which is called before ComputeQuerySummary().
    * The unregistering thread may not call Coordinator::Cancel() if
      coord_exec_called_ is not set.
    This patch addresses both problems, by adding synchronization
    so that Coordinator::Cancel() blocks until ComputeQuerySummary()
    finishes, and so that coord_exec_called_ is set even if
    'async_exec_thread_' is cancelling the query (which ensures
    that both threads call Coordinator::Cancel() and synchronize there).
    This is only done on the finalizing thread. Other threads that
    initiate cancellation should not block on cancellation, so that
    threads cannot pile up waiting for the query.
    Added a regression test that failed before this change.
    Ran exhaustive tests.
    Change-Id: I62cacc06d9877d33b79a33aeb3b82195e639b5c4
    Reviewed-by: Impala Public Jenkins <>
    Tested-by: Impala Public Jenkins <>
 be/src/runtime/          |  7 ++++++-
 be/src/runtime/coordinator.h           | 18 ++++++++++++++++--
 be/src/service/ | 26 +++++++++++++++-----------
 be/src/service/client-request-state.h  |  8 ++++++--
 tests/util/              | 14 ++++++++++++++
 5 files changed, 57 insertions(+), 16 deletions(-)

diff --git a/be/src/runtime/ b/be/src/runtime/
index ca1eaad..91415ce 100644
--- a/be/src/runtime/
+++ b/be/src/runtime/
@@ -729,6 +729,7 @@ void Coordinator::HandleExecStateTransition(
   // WaitForBackends() and CancelBackends() ensures that.
   // TODO: should move this off of the query execution path?
+  finalized_.Set(true);
 Status Coordinator::FinalizeHdfsDml() {
@@ -860,7 +861,7 @@ Status Coordinator::GetNext(QueryResultSet* results, int 
max_rows, bool* eos,
   return Status::OK();
-void Coordinator::Cancel() {
+void Coordinator::Cancel(bool wait_until_finalized) {
   // Illegal to call Cancel() before Exec() returns, so there's no danger of 
the cancel
   // RPC passing the exec RPC.
   DCHECK(exec_rpcs_complete_.Load()) << "Exec() must be called first";
@@ -870,6 +871,10 @@ void Coordinator::Cancel() {
   // waiting for cancellation. In that case, we want explicit cancellation to 
   // backend_exec_complete_barrier_, which we do by forcing cancellation.
   if (ReturnedAllResults()) CancelBackends(/*fire_and_forget=*/ true);
+  // IMPALA-5756: Wait until finalized, in case a different thread was 
handling the
+  // transition to the terminal state.
+  if (wait_until_finalized) finalized_.Get();
 void Coordinator::CancelBackends(bool fire_and_forget) {
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 6bbaeca..9a488d6 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -33,6 +33,7 @@
 #include "runtime/dml-exec-state.h"
 #include "util/counting-barrier.h"
 #include "util/progress-updater.h"
+#include "util/promise.h"
 #include "util/runtime-profile-counters.h"
 #include "util/spinlock.h"
@@ -98,6 +99,11 @@ struct FragmentExecParams;
 /// Lifecycle: this object must not be destroyed until after one of the three 
 /// above is reached (error, cancelled, or EOS) to ensure resources are 
+/// There is work that needs to be done as part of the transition into each of 
+/// above states. That work is done by the thread that triggered the state 
+/// *after* the atomic state transition by calling 
HandleExecStateTransition(). If
+/// another thread depends on that work being complete, they can wait on the 
+/// promise.
 /// Lock ordering: (lower-numbered acquired before higher-numbered)
 /// 1. wait_lock_
@@ -141,8 +147,12 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
       int64_t block_on_wait_time_us) WARN_UNUSED_RESULT;
   /// Cancel execution of query and sets the overall query status to CANCELLED 
if the
-  /// query is still executing. Idempotent.
-  void Cancel();
+  /// query is still executing. Idempotent. If 'wait_until_finalized' is true 
and another
+  /// thread is cancelling 'coord_', block until the query profile is 
completed, e.g.
+  /// if ComputeQuerySummary() is running concurrently. This is only used by 
the thread
+  /// unregistering the query - other threads should return immediately if the 
+  /// is in the process of being cancelled.
+  void Cancel(bool wait_until_finalized=false);
   /// Called by the report status RPC handler to update execution status of a 
   /// backend as well as dml_exec_state_ and the profile. This may block if 
exec RPCs are
@@ -410,6 +420,10 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// - ERROR: error status
   Status exec_status_;
+  /// Set when all of the work to transition into a terminal state is complete,
+  /// including ComputeQuerySummary().
+  Promise<bool> finalized_;
   /// Contains all the state about filters being handled by this coordinator.
   std::unique_ptr<FilterRoutingTable> filter_routing_table_;
diff --git a/be/src/service/ 
index 28bece7..5a54dbc 100644
--- a/be/src/service/
+++ b/be/src/service/
@@ -547,18 +547,23 @@ void ClientRequestState::FinishExecQueryOrDmlRequest() {
   DebugActionNoFail(schedule_->query_options(), "CRS_AFTER_COORD_STARTS");
+  // Make coordinator profile visible, even upon failure.
+  if (coord_->query_profile() != nullptr) 
   bool cancelled = false;
   Status cancellation_status;
     lock_guard<mutex> l(lock_);
     if (!UpdateQueryStatus(exec_status).ok()) return;
+    // Coordinator::Exec() finished successfully - it is safe to concurrently 
+    // 'coord_'. This thread needs to cancel the coordinator if cancellation 
+    // *before* 'coord_' was accessible to other threads. Once the lock is 
dropped, any
+    // future calls to Cancel() are responsible for calling 
Coordinator::Cancel(), so
+    // while holding the lock we need to both perform a check for cancellation 
and make
+    // the coord_ visible.
+    coord_exec_called_.Store(true);
     cancelled = is_cancelled_;
-    if (!cancelled) {
-      // Once the lock is dropped, any future calls to Cancel() are 
responsible for
-      // calling Coordinator::Cancel(), so while holding the lock we need to 
both perform
-      // a check for cancellation and make the coord_ visible.
-      coord_exec_called_.Store(true);
-    } else {
+    if (cancelled) {
       VLOG_QUERY << "Cancelled right after starting the coordinator query id="
                  << PrintId(query_id());
@@ -569,8 +574,6 @@ void ClientRequestState::FinishExecQueryOrDmlRequest() {
-  profile_->AddChild(coord_->query_profile());
@@ -730,7 +733,7 @@ Status ClientRequestState::ExecShutdownRequest() {
 Status ClientRequestState::Finalize(bool check_inflight, const Status* cause) {
-  RETURN_IF_ERROR(Cancel(check_inflight, cause));
+  RETURN_IF_ERROR(Cancel(check_inflight, cause, 
   // Make sure we join on wait_thread_ before we finish (and especially before 
this object
   // is destroyed).
@@ -1139,7 +1142,8 @@ Status ClientRequestState::FetchRowsInternal(const 
int32_t max_rows,
   return Status::OK();
-Status ClientRequestState::Cancel(bool check_inflight, const Status* cause) {
+Status ClientRequestState::Cancel(
+    bool check_inflight, const Status* cause, bool wait_until_finalized) {
   if (check_inflight) {
     // If the query is in 'inflight_queries' it means that the query has 
actually started
     // executing. It is ok if the query is removed from 'inflight_queries' 
@@ -1175,7 +1179,7 @@ Status ClientRequestState::Cancel(bool check_inflight, 
const Status* cause) {
   // Ensure the parent query is cancelled if execution has started (if the 
query was not
   // started, cancellation is handled by the 'async-exec-thread' thread). 
'lock_' should
   // not be held because cancellation involves RPCs and can block for a long 
-  if (GetCoordinator() != nullptr) GetCoordinator()->Cancel();
+  if (GetCoordinator() != nullptr) 
   return Status::OK();
diff --git a/be/src/service/client-request-state.h 
index 51f0276..a69ed1e 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -170,12 +170,16 @@ class ClientRequestState {
   /// Cancels the child queries and the coordinator with the given cause.
   /// If cause is NULL, it assumes this was deliberately cancelled by the user 
while in
   /// FINISHED state. Otherwise, sets state to ERROR (TODO: IMPALA-1262: use 
-  /// Does nothing if the query has reached EOS or already cancelled.
+  /// Does nothing if the query has reached EOS or already cancelled. If
+  /// 'wait_until_finalized' is true and another thread is cancelling 
'coord_', block
+  /// until cancellaton of 'coord_' finishes and it is finalized.
+  /// 'wait_until_finalized' should only used by the single thread finalizing 
the query,
+  /// to avoid many threads piling up waiting for query cancellation.
   /// Only returns an error if 'check_inflight' is true and the query is not 
   /// in-flight. Otherwise, proceed and return Status::OK() even if the query 
   /// in-flight (for cleaning up after an error on the query issuing path).
-  Status Cancel(bool check_inflight, const Status* cause) WARN_UNUSED_RESULT;
+  Status Cancel(bool check_inflight, const Status* cause, bool 
   /// This is called when the query is done (finished, cancelled, or failed). 
This runs
   /// synchronously within the last client RPC and does any work that is 
required before
diff --git a/tests/util/ b/tests/util/
index 39e4700..520f958 100644
--- a/tests/util/
+++ b/tests/util/
@@ -55,6 +55,20 @@ def cancel_query_and_validate_state(client, query, 
exec_option, table_format,
   # Before accessing fetch_results_error we need to join the fetch thread
+  # IMPALA-9756: Make sure query summary info has been added to profile for 
+  # that proceeded far enough into execution that it should have been added to 
+  # The logic in ClientRequestState/Coordinator is convoluted, but the summary 
+  # should be added if the query has got to the point where rows can be 
fetched. We
+  # need to do this after both close_query() and fetch() have returned to 
+  # that the synchronous phase of query unregistration has finished and the 
+  # is final.
+  profile = client.get_runtime_profile(handle)
+  if ("- Completed admission: " in profile and
+      ("- First row fetched:" in profile or "- Request finished:" in profile)):
+    # TotalBytesRead is a sentinel that will only be created if 
+    # has been run by the cancelling thread.
+    assert "- TotalBytesRead:" in profile, profile
   if thread.fetch_results_error is None:
     # If the fetch rpc didn't result in CANCELLED (and auto-close the query) 
     # the close rpc should have succeeded.

Reply via email to