Repository: incubator-impala Updated Branches: refs/heads/master e71e359af -> ed87c4060
IMPALA-5749: coordinator race hits DCHECK 'num_remaining_backends_ > 0' In Coordinator::UpdateBackendExecStatus(), we check if the backend has already completed with BackendState::IsDone() and return without applying the update if so to avoid updating num_remaining_backends_ twice for the same completed backend. The problem is that the value of BackendState::IsDone() is updated by the call to BackendState::ApplyExecStatusReport() that comes after it, but these operations are not performed atomically, so if there are two simultaneous calls to UpdateBackendExecStatus(), they can both call IsDone(), both get 'false', and then proceed to erroneously both update num_remaining_backends_, hitting a DCHECK. This patch modifies ApplyExecStatusReport to return true iff this report transitioned the backend to a done status, and then only updates num_remaining_backends_ in this case, ensuring it is only updated once per backend. Testing: - Ran test_finst_cancel_when_query_complete 10,000 times without hitting the DCHECK (previously, it would hit about once per 300 runs). Change-Id: I1528661e5df6d9732ebfeb414576c82ec5c92241 Reviewed-on: http://gerrit.cloudera.org:8080/7577 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b98c621a Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b98c621a Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b98c621a Branch: refs/heads/master Commit: b98c621a801d75dc1e8f9603c858335548d54cfb Parents: e71e359 Author: Thomas Tauber-Marshall <[email protected]> Authored: Thu Aug 3 10:09:46 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed Aug 23 02:35:15 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/coordinator-backend-state.cc | 25 +++++++---- be/src/runtime/coordinator-backend-state.h | 24 +++++----- be/src/runtime/coordinator.cc | 57 ++++++++++-------------- 3 files changed, 53 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b98c621a/be/src/runtime/coordinator-backend-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc index 06fafc5..88f7f21 100644 --- a/be/src/runtime/coordinator-backend-state.cc +++ b/be/src/runtime/coordinator-backend-state.cc @@ -228,20 +228,29 @@ void Coordinator::BackendState::MergeErrorLog(ErrorLogMap* merged) { if (error_log_.size() > 0) MergeErrorMaps(error_log_, merged); } -bool Coordinator::BackendState::IsDone() { - lock_guard<mutex> l(lock_); - return IsDoneInternal(); +void Coordinator::BackendState::LogFirstInProgress( + std::vector<Coordinator::BackendState*> backend_states) { + for (Coordinator::BackendState* backend_state : backend_states) { + lock_guard<mutex> l(backend_state->lock_); + if (!backend_state->IsDone()) { + VLOG_QUERY << "query_id=" << backend_state->query_id_ + << ": first in-progress backend: " << backend_state->impalad_address(); + break; + } + } } -inline bool Coordinator::BackendState::IsDoneInternal() const { +inline bool Coordinator::BackendState::IsDone() const { return num_remaining_instances_ == 0 || !status_.ok(); } -void Coordinator::BackendState::ApplyExecStatusReport( +bool Coordinator::BackendState::ApplyExecStatusReport( const TReportExecStatusParams& backend_exec_status, ExecSummary* exec_summary, - ProgressUpdater* scan_range_progress, bool* done) { + ProgressUpdater* scan_range_progress) { lock_guard<SpinLock> l1(exec_summary->lock); lock_guard<mutex> l2(lock_); + // If this backend completed previously, don't apply the update. + if (IsDone()) return false; for (const TFragmentInstanceExecStatus& instance_exec_status: backend_exec_status.instance_exec_status) { Status instance_status(instance_exec_status.status); @@ -298,8 +307,8 @@ void Coordinator::BackendState::ApplyExecStatusReport( VLOG_FILE << "host=" << host_ << " error log: " << PrintErrorMapToString(error_log_); } - *done = IsDoneInternal(); // TODO: keep backend-wide stopwatch? + return IsDone(); } void Coordinator::BackendState::UpdateExecStats( @@ -327,7 +336,7 @@ bool Coordinator::BackendState::Cancel() { if (!rpc_sent_) return false; // don't cancel if it already finished (for any reason) - if (IsDoneInternal()) return false; + if (IsDone()) return false; /// If the status is not OK, we still try to cancel - !OK status might mean /// communication failure between backend and coordinator, but fragment http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b98c621a/be/src/runtime/coordinator-backend-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h index d4762af..0846119 100644 --- a/be/src/runtime/coordinator-backend-state.h +++ b/be/src/runtime/coordinator-backend-state.h @@ -67,13 +67,14 @@ class Coordinator::BackendState { CountingBarrier* rpc_complete_barrier); /// Update overall execution status, including the instances' exec status/profiles - /// and the error log. Updates the fragment instances' TExecStats in exec_summary - /// (exec_summary->nodes.exec_stats) and updates progress_update, and sets - /// done to true if all fragment instances completed, regardless of status. - /// If any instance reports an error, the overall execution status becomes the first - /// reported error status and 'done' is set to true. - void ApplyExecStatusReport(const TReportExecStatusParams& backend_exec_status, - ExecSummary* exec_summary, ProgressUpdater* scan_range_progress, bool* done); + /// and the error log, if this backend is not already done. Updates the fragment + /// instances' TExecStats in exec_summary (exec_summary->nodes.exec_stats) and updates + /// progress_update. If any instance reports an error, the overall execution status + /// becomes the first reported error status. Returns true iff this update changed + /// IsDone() from false to true, either because it was the last fragment to complete or + /// because it was the first error received. + bool ApplyExecStatusReport(const TReportExecStatusParams& backend_exec_status, + ExecSummary* exec_summary, ProgressUpdater* scan_range_progress); /// Update completion_times, rates, and avg_profile for all fragment_stats. void UpdateExecStats(const std::vector<FragmentStats*>& fragment_stats); @@ -108,8 +109,9 @@ class Coordinator::BackendState { /// Only valid after Exec(). int64_t rpc_latency() const { return rpc_latency_; } - /// Return true if execution at this backend is done. - bool IsDone(); + /// Print host/port info for the first backend that's still in progress as a + /// debugging aid for backend deadlocks. + static void LogFirstInProgress(std::vector<BackendState*> backend_states); private: /// Execution stats for a single fragment instance. @@ -217,8 +219,8 @@ class Coordinator::BackendState { const FilterRoutingTable& filter_routing_table, TExecQueryFInstancesParams* rpc_params); - /// Return true if execution at this backend is done. Doesn't acquire lock. - bool IsDoneInternal() const; + /// Return true if execution at this backend is done. Caller must hold lock_. + bool IsDone() const; }; /// Per fragment execution statistics. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b98c621a/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 551e814..a9936ad 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -931,59 +931,48 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param params.coord_state_idx, backend_states_.size() - 1)); } BackendState* backend_state = backend_states_[params.coord_state_idx]; - // ignore stray exec reports if we're already done, otherwise we lose - // track of num_remaining_backends_ - if (backend_state->IsDone()) return Status::OK(); // TODO: return here if returned_all_results_? // TODO: return CANCELLED in that case? Although that makes the cancellation propagation // path more irregular. - bool done; - backend_state->ApplyExecStatusReport(params, &exec_summary_, &progress_, &done); - // TODO: only do this when the sink is done; probably missing a done field // in TReportExecStatus for that if (params.__isset.insert_exec_status) { UpdateInsertExecStatus(params.insert_exec_status); } - // for now, abort the query if we see any error except if returned_all_results_ is true - // (UpdateStatus() initiates cancellation, if it hasn't already been) - // TODO: clarify control flow here, it's unclear we should even process this status - // report if returned_all_results_ is true - TUniqueId failed_instance_id; - Status status = backend_state->GetStatus(&failed_instance_id); - if (!status.ok() && !returned_all_results_) { - Status ignored = UpdateStatus(status, failed_instance_id, - TNetworkAddressToString(backend_state->impalad_address())); - return Status::OK(); - } - - // If all results have been returned, return a cancelled status to force the fragment - // instance to stop executing. - if (!done && returned_all_results_) return Status::CANCELLED; + if (backend_state->ApplyExecStatusReport(params, &exec_summary_, &progress_)) { + // This report made this backend done, so update the status and + // num_remaining_backends_. + + // for now, abort the query if we see any error except if returned_all_results_ is + // true (UpdateStatus() initiates cancellation, if it hasn't already been) + // TODO: clarify control flow here, it's unclear we should even process this status + // report if returned_all_results_ is true + TUniqueId failed_instance_id; + Status status = backend_state->GetStatus(&failed_instance_id); + if (!status.ok() && !returned_all_results_) { + Status ignored = UpdateStatus(status, failed_instance_id, + TNetworkAddressToString(backend_state->impalad_address())); + return Status::OK(); + } - if (done) { lock_guard<mutex> l(lock_); DCHECK_GT(num_remaining_backends_, 0); - VLOG_QUERY << "Backend completed: " - << " host=" << backend_state->impalad_address() - << " remaining=" << num_remaining_backends_ - 1; if (VLOG_QUERY_IS_ON && num_remaining_backends_ > 1) { - // print host/port info for the first backend that's still in progress as a - // debugging aid for backend deadlocks - for (BackendState* backend_state: backend_states_) { - if (!backend_state->IsDone()) { - VLOG_QUERY << "query_id=" << query_id() << ": first in-progress backend: " - << backend_state->impalad_address(); - break; - } - } + VLOG_QUERY << "Backend completed: " + << " host=" << backend_state->impalad_address() + << " remaining=" << num_remaining_backends_ - 1; + BackendState::LogFirstInProgress(backend_states_); } if (--num_remaining_backends_ == 0 || !status.ok()) { backend_completion_cv_.notify_all(); } + return Status::OK(); } + // If all results have been returned, return a cancelled status to force the fragment + // instance to stop executing. + if (returned_all_results_) return Status::CANCELLED; return Status::OK(); }
