Repository: incubator-impala
Updated Branches:
  refs/heads/master e71e359af -> ed87c4060


IMPALA-5749: coordinator race hits DCHECK 'num_remaining_backends_ > 0'

In Coordinator::UpdateBackendExecStatus(), we check if the backend
has already completed with BackendState::IsDone() and return without
applying the update if so to avoid updating num_remaining_backends_
twice for the same completed backend.

The problem is that the value of BackendState::IsDone() is updated by
the call to BackendState::ApplyExecStatusReport() that comes after it,
but these operations are not performed atomically, so if there are
two simultaneous calls to UpdateBackendExecStatus(), they can both
call IsDone(), both get 'false', and then proceed to erroneously both
update num_remaining_backends_, hitting a DCHECK.

This patch modifies ApplyExecStatusReport to return true iff this
report transitioned the backend to a done status, and then only
updates num_remaining_backends_ in this case, ensuring it is only
updated once per backend.

Testing:
- Ran test_finst_cancel_when_query_complete 10,000 times without
  hitting the DCHECK (previously, it would hit about once per 300
  runs).

Change-Id: I1528661e5df6d9732ebfeb414576c82ec5c92241
Reviewed-on: http://gerrit.cloudera.org:8080/7577
Reviewed-by: Dan Hecht <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b98c621a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b98c621a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b98c621a

Branch: refs/heads/master
Commit: b98c621a801d75dc1e8f9603c858335548d54cfb
Parents: e71e359
Author: Thomas Tauber-Marshall <[email protected]>
Authored: Thu Aug 3 10:09:46 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Wed Aug 23 02:35:15 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator-backend-state.cc | 25 +++++++----
 be/src/runtime/coordinator-backend-state.h  | 24 +++++-----
 be/src/runtime/coordinator.cc               | 57 ++++++++++--------------
 3 files changed, 53 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b98c621a/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc 
b/be/src/runtime/coordinator-backend-state.cc
index 06fafc5..88f7f21 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -228,20 +228,29 @@ void 
Coordinator::BackendState::MergeErrorLog(ErrorLogMap* merged) {
   if (error_log_.size() > 0)  MergeErrorMaps(error_log_, merged);
 }
 
-bool Coordinator::BackendState::IsDone() {
-  lock_guard<mutex> l(lock_);
-  return IsDoneInternal();
+void Coordinator::BackendState::LogFirstInProgress(
+    std::vector<Coordinator::BackendState*> backend_states) {
+  for (Coordinator::BackendState* backend_state : backend_states) {
+    lock_guard<mutex> l(backend_state->lock_);
+    if (!backend_state->IsDone()) {
+      VLOG_QUERY << "query_id=" << backend_state->query_id_
+                 << ": first in-progress backend: " << 
backend_state->impalad_address();
+      break;
+    }
+  }
 }
 
-inline bool Coordinator::BackendState::IsDoneInternal() const {
+inline bool Coordinator::BackendState::IsDone() const {
   return num_remaining_instances_ == 0 || !status_.ok();
 }
 
-void Coordinator::BackendState::ApplyExecStatusReport(
+bool Coordinator::BackendState::ApplyExecStatusReport(
     const TReportExecStatusParams& backend_exec_status, ExecSummary* 
exec_summary,
-    ProgressUpdater* scan_range_progress, bool* done) {
+    ProgressUpdater* scan_range_progress) {
   lock_guard<SpinLock> l1(exec_summary->lock);
   lock_guard<mutex> l2(lock_);
+  // If this backend completed previously, don't apply the update.
+  if (IsDone()) return false;
   for (const TFragmentInstanceExecStatus& instance_exec_status:
       backend_exec_status.instance_exec_status) {
     Status instance_status(instance_exec_status.status);
@@ -298,8 +307,8 @@ void Coordinator::BackendState::ApplyExecStatusReport(
     VLOG_FILE << "host=" << host_ << " error log: " << 
PrintErrorMapToString(error_log_);
   }
 
-  *done = IsDoneInternal();
   // TODO: keep backend-wide stopwatch?
+  return IsDone();
 }
 
 void Coordinator::BackendState::UpdateExecStats(
@@ -327,7 +336,7 @@ bool Coordinator::BackendState::Cancel() {
   if (!rpc_sent_) return false;
 
   // don't cancel if it already finished (for any reason)
-  if (IsDoneInternal()) return false;
+  if (IsDone()) return false;
 
   /// If the status is not OK, we still try to cancel - !OK status might mean
   /// communication failure between backend and coordinator, but fragment

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b98c621a/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h 
b/be/src/runtime/coordinator-backend-state.h
index d4762af..0846119 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -67,13 +67,14 @@ class Coordinator::BackendState {
       CountingBarrier* rpc_complete_barrier);
 
   /// Update overall execution status, including the instances' exec 
status/profiles
-  /// and the error log. Updates the fragment instances' TExecStats in 
exec_summary
-  /// (exec_summary->nodes.exec_stats) and updates progress_update, and sets
-  /// done to true if all fragment instances completed, regardless of status.
-  /// If any instance reports an error, the overall execution status becomes 
the first
-  /// reported error status and 'done' is set to true.
-  void ApplyExecStatusReport(const TReportExecStatusParams& 
backend_exec_status,
-      ExecSummary* exec_summary, ProgressUpdater* scan_range_progress, bool* 
done);
+  /// and the error log, if this backend is not already done. Updates the 
fragment
+  /// instances' TExecStats in exec_summary (exec_summary->nodes.exec_stats) 
and updates
+  /// progress_update. If any instance reports an error, the overall execution 
status
+  /// becomes the first reported error status. Returns true iff this update 
changed
+  /// IsDone() from false to true, either because it was the last fragment to 
complete or
+  /// because it was the first error received.
+  bool ApplyExecStatusReport(const TReportExecStatusParams& 
backend_exec_status,
+      ExecSummary* exec_summary, ProgressUpdater* scan_range_progress);
 
   /// Update completion_times, rates, and avg_profile for all fragment_stats.
   void UpdateExecStats(const std::vector<FragmentStats*>& fragment_stats);
@@ -108,8 +109,9 @@ class Coordinator::BackendState {
   /// Only valid after Exec().
   int64_t rpc_latency() const { return rpc_latency_; }
 
-  /// Return true if execution at this backend is done.
-  bool IsDone();
+  /// Print host/port info for the first backend that's still in progress as a
+  /// debugging aid for backend deadlocks.
+  static void LogFirstInProgress(std::vector<BackendState*> backend_states);
 
  private:
   /// Execution stats for a single fragment instance.
@@ -217,8 +219,8 @@ class Coordinator::BackendState {
       const FilterRoutingTable& filter_routing_table,
       TExecQueryFInstancesParams* rpc_params);
 
-  /// Return true if execution at this backend is done. Doesn't acquire lock.
-  bool IsDoneInternal() const;
+  /// Return true if execution at this backend is done. Caller must hold lock_.
+  bool IsDone() const;
 };
 
 /// Per fragment execution statistics.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b98c621a/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 551e814..a9936ad 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -931,59 +931,48 @@ Status Coordinator::UpdateBackendExecStatus(const 
TReportExecStatusParams& param
             params.coord_state_idx, backend_states_.size() - 1));
   }
   BackendState* backend_state = backend_states_[params.coord_state_idx];
-  // ignore stray exec reports if we're already done, otherwise we lose
-  // track of num_remaining_backends_
-  if (backend_state->IsDone()) return Status::OK();
   // TODO: return here if returned_all_results_?
   // TODO: return CANCELLED in that case? Although that makes the cancellation 
propagation
   // path more irregular.
 
-  bool done;
-  backend_state->ApplyExecStatusReport(params, &exec_summary_, &progress_, 
&done);
-
   // TODO: only do this when the sink is done; probably missing a done field
   // in TReportExecStatus for that
   if (params.__isset.insert_exec_status) {
     UpdateInsertExecStatus(params.insert_exec_status);
   }
 
-  // for now, abort the query if we see any error except if 
returned_all_results_ is true
-  // (UpdateStatus() initiates cancellation, if it hasn't already been)
-  // TODO: clarify control flow here, it's unclear we should even process this 
status
-  // report if returned_all_results_ is true
-  TUniqueId failed_instance_id;
-  Status status = backend_state->GetStatus(&failed_instance_id);
-  if (!status.ok() && !returned_all_results_) {
-    Status ignored = UpdateStatus(status, failed_instance_id,
-        TNetworkAddressToString(backend_state->impalad_address()));
-    return Status::OK();
-  }
-
-  // If all results have been returned, return a cancelled status to force the 
fragment
-  // instance to stop executing.
-  if (!done && returned_all_results_) return Status::CANCELLED;
+  if (backend_state->ApplyExecStatusReport(params, &exec_summary_, 
&progress_)) {
+    // This report made this backend done, so update the status and
+    // num_remaining_backends_.
+
+    // for now, abort the query if we see any error except if 
returned_all_results_ is
+    // true (UpdateStatus() initiates cancellation, if it hasn't already been)
+    // TODO: clarify control flow here, it's unclear we should even process 
this status
+    // report if returned_all_results_ is true
+    TUniqueId failed_instance_id;
+    Status status = backend_state->GetStatus(&failed_instance_id);
+    if (!status.ok() && !returned_all_results_) {
+      Status ignored = UpdateStatus(status, failed_instance_id,
+          TNetworkAddressToString(backend_state->impalad_address()));
+      return Status::OK();
+    }
 
-  if (done) {
     lock_guard<mutex> l(lock_);
     DCHECK_GT(num_remaining_backends_, 0);
-    VLOG_QUERY << "Backend completed: "
-        << " host=" << backend_state->impalad_address()
-        << " remaining=" << num_remaining_backends_ - 1;
     if (VLOG_QUERY_IS_ON && num_remaining_backends_ > 1) {
-      // print host/port info for the first backend that's still in progress 
as a
-      // debugging aid for backend deadlocks
-      for (BackendState* backend_state: backend_states_) {
-        if (!backend_state->IsDone()) {
-          VLOG_QUERY << "query_id=" << query_id() << ": first in-progress 
backend: "
-                     << backend_state->impalad_address();
-          break;
-        }
-      }
+      VLOG_QUERY << "Backend completed: "
+          << " host=" << backend_state->impalad_address()
+          << " remaining=" << num_remaining_backends_ - 1;
+      BackendState::LogFirstInProgress(backend_states_);
     }
     if (--num_remaining_backends_ == 0 || !status.ok()) {
       backend_completion_cv_.notify_all();
     }
+    return Status::OK();
   }
+  // If all results have been returned, return a cancelled status to force the 
fragment
+  // instance to stop executing.
+  if (returned_all_results_) return Status::CANCELLED;
 
   return Status::OK();
 }

Reply via email to