Repository: impala Updated Branches: refs/heads/master 3661100fa -> 75a10a3dd
IMPALA-7033/IMPALA-7030: Backout suspected change leading to crash Revert "IMPALA-5384, part 2: Simplify Coordinator locking and clarify state" This reverts commit 6ca87e46736a1e591ed7d7d5fee05b4b4d2fbb50. Change-Id: Idc63006e6e04130b2873a6a9730e434c563327c5 Reviewed-on: http://gerrit.cloudera.org:8080/10412 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4fab4288 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4fab4288 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4fab4288 Branch: refs/heads/master Commit: 4fab42883184c247b951dba4237f4303c502d410 Parents: 3661100 Author: Dan Hecht <[email protected]> Authored: Tue May 15 11:36:15 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Tue May 15 22:24:26 2018 +0000 ---------------------------------------------------------------------- be/src/runtime/coordinator-backend-state.h | 8 - be/src/runtime/coordinator.cc | 424 +++++++++++++----------- be/src/runtime/coordinator.h | 330 +++++++++--------- be/src/service/client-request-state.cc | 2 +- be/src/service/impala-server.h | 5 + be/src/util/counting-barrier.h | 21 +- 6 files changed, 395 insertions(+), 395 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/4fab4288/be/src/runtime/coordinator-backend-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h index e7af2e2..d2f122c 100644 --- a/be/src/runtime/coordinator-backend-state.h +++ b/be/src/runtime/coordinator-backend-state.h @@ -21,17 +21,9 @@ #include <vector> #include <unordered_set> -#include <boost/accumulators/accumulators.hpp> -#include <boost/accumulators/statistics/max.hpp> -#include <boost/accumulators/statistics/mean.hpp> -#include <boost/accumulators/statistics/median.hpp> -#include <boost/accumulators/statistics/min.hpp> -#include <boost/accumulators/statistics/stats.hpp> -#include <boost/accumulators/statistics/variance.hpp> #include <boost/thread/mutex.hpp> #include "runtime/coordinator.h" -#include "scheduling/query-schedule.h" #include "util/progress-updater.h" #include "util/stopwatch.h" #include "util/runtime-profile.h" http://git-wip-us.apache.org/repos/asf/impala/blob/4fab4288/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 91f2e29..d87971e 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -25,7 +25,6 @@ #include <boost/algorithm/string.hpp> #include <gutil/strings/substitute.h> -#include "common/hdfs.h" #include "exec/data-sink.h" #include "exec/plan-root-sink.h" #include "gen-cpp/ImpalaInternalService.h" @@ -40,7 +39,6 @@ #include "runtime/query-state.h" #include "scheduling/admission-controller.h" #include "scheduling/scheduler.h" -#include "scheduling/query-schedule.h" #include "util/bloom-filter.h" #include "util/counting-barrier.h" #include "util/hdfs-bulk-ops.h" @@ -53,13 +51,16 @@ using namespace apache::thrift; using namespace rapidjson; +using namespace strings; using boost::algorithm::iequals; using boost::algorithm::is_any_of; using boost::algorithm::join; using boost::algorithm::token_compress_on; using boost::algorithm::split; using boost::filesystem::path; +using std::unique_ptr; +DECLARE_int32(be_port); DECLARE_string(hostname); using namespace impala; @@ -75,9 +76,11 @@ Coordinator::Coordinator( query_events_(events) {} Coordinator::~Coordinator() { - // Must have entered a terminal exec state guaranteeing resources were released. - DCHECK_NE(exec_state_, ExecState::EXECUTING); - // Release the coordinator's reference to the query control structures. + DCHECK(released_exec_resources_) + << "ReleaseExecResources() must be called before Coordinator is destroyed"; + DCHECK(released_admission_control_resources_) + << "ReleaseAdmissionControlResources() must be called before Coordinator is " + << "destroyed"; if (query_state_ != nullptr) { ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_); } @@ -106,6 +109,12 @@ Status Coordinator::Exec() { bool is_mt_execution = request.query_ctx.client_request.query_options.mt_dop > 0; if (is_mt_execution) filter_mode_ = TRuntimeFilterMode::OFF; + // to keep things simple, make async Cancel() calls wait until plan fragment + // execution has been initiated, otherwise we might try to cancel fragment + // execution at Impala daemons where it hasn't even started + // TODO: revisit this, it may not be true anymore + lock_guard<mutex> l(lock_); + query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx()); query_state_->AcquireExecResourceRefcount(); // Decremented in ReleaseExecResources(). filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker( @@ -129,9 +138,9 @@ Status Coordinator::Exec() { InitFilterRoutingTable(); } - // At this point, all static setup is done and all structures are initialized. Only - // runtime-related state changes past this point (examples: fragment instance - // profiles, etc.) + // At this point, all static setup is done and all structures are initialized. + // Only runtime-related state changes past this point (examples: + // num_remaining_backends_, fragment instance profiles, etc.) StartBackendExec(); RETURN_IF_ERROR(FinishBackendStartup()); @@ -146,7 +155,7 @@ Status Coordinator::Exec() { // which means we failed Prepare Status prepare_status = query_state_->WaitForPrepare(); DCHECK(!prepare_status.ok()); - return UpdateExecState(prepare_status, nullptr, FLAGS_hostname); + return prepare_status; } // When GetFInstanceState() returns the coordinator instance, the Prepare phase @@ -160,6 +169,7 @@ Status Coordinator::Exec() { coord_sink_ = coord_instance_->root_sink(); DCHECK(coord_sink_ != nullptr); } + return Status::OK(); } @@ -198,8 +208,6 @@ void Coordinator::InitFragmentStats() { void Coordinator::InitBackendStates() { int num_backends = schedule_.per_backend_exec_params().size(); DCHECK_GT(num_backends, 0); - - lock_guard<SpinLock> l(backend_states_init_lock_); backend_states_.resize(num_backends); RuntimeProfile::Counter* num_backends_counter = @@ -207,13 +215,19 @@ void Coordinator::InitBackendStates() { num_backends_counter->Set(num_backends); // create BackendStates + bool has_coord_fragment = schedule_.GetCoordFragment() != nullptr; + const TNetworkAddress& coord_address = ExecEnv::GetInstance()->backend_address(); int backend_idx = 0; for (const auto& entry: schedule_.per_backend_exec_params()) { + if (has_coord_fragment && coord_address == entry.first) { + coord_backend_idx_ = backend_idx; + } BackendState* backend_state = obj_pool()->Add( new BackendState(query_id(), backend_idx, filter_mode_)); backend_state->Init(entry.second, fragment_stats_, obj_pool()); backend_states_[backend_idx++] = backend_state; } + DCHECK(!has_coord_fragment || coord_backend_idx_ != -1); } void Coordinator::ExecSummary::Init(const QuerySchedule& schedule) { @@ -327,8 +341,8 @@ void Coordinator::InitFilterRoutingTable() { void Coordinator::StartBackendExec() { int num_backends = backend_states_.size(); - exec_rpcs_complete_barrier_.reset(new CountingBarrier(num_backends)); - backend_exec_complete_barrier_.reset(new CountingBarrier(num_backends)); + exec_complete_barrier_.reset(new CountingBarrier(num_backends)); + num_remaining_backends_ = num_backends; DebugOptions debug_options(schedule_.query_options()); @@ -340,11 +354,11 @@ void Coordinator::StartBackendExec() { ExecEnv::GetInstance()->exec_rpc_thread_pool()->Offer( [backend_state, this, &debug_options]() { backend_state->Exec(query_ctx(), debug_options, filter_routing_table_, - exec_rpcs_complete_barrier_.get()); + exec_complete_barrier_.get()); }); } - exec_rpcs_complete_barrier_->Wait(); + exec_complete_barrier_->Wait(); VLOG_QUERY << "started execution on " << num_backends << " backends for query_id=" << PrintId(query_id()); query_events_->MarkEvent( @@ -353,24 +367,26 @@ void Coordinator::StartBackendExec() { } Status Coordinator::FinishBackendStartup() { + Status status = Status::OK(); const TMetricDef& def = MakeTMetricDef("backend-startup-latencies", TMetricKind::HISTOGRAM, TUnit::TIME_MS); // Capture up to 30 minutes of start-up times, in ms, with 4 s.f. accuracy. HistogramMetric latencies(def, 30 * 60 * 1000, 4); - Status status = Status::OK(); - string error_hostname; for (BackendState* backend_state: backend_states_) { // preserve the first non-OK, if there is one Status backend_status = backend_state->GetStatus(); - if (!backend_status.ok() && status.ok()) { - status = backend_status; - error_hostname = backend_state->impalad_address().hostname; - } + if (!backend_status.ok() && status.ok()) status = backend_status; latencies.Update(backend_state->rpc_latency()); } + query_profile_->AddInfoString( "Backend startup latencies", latencies.ToHumanReadable()); - return UpdateExecState(status, nullptr, error_hostname); + + if (!status.ok()) { + query_status_ = status; + CancelInternal(); + } + return status; } string Coordinator::FilterDebugString() { @@ -430,115 +446,40 @@ string Coordinator::FilterDebugString() { return Substitute("\n$0", table_printer.ToString()); } -const char* Coordinator::ExecStateToString(const ExecState state) { - static const unordered_map<ExecState, const char *> exec_state_to_str{ - {ExecState::EXECUTING, "EXECUTING"}, - {ExecState::RETURNED_RESULTS, "RETURNED_RESULTS"}, - {ExecState::CANCELLED, "CANCELLED"}, - {ExecState::ERROR, "ERROR"}}; - return exec_state_to_str.at(state); +Status Coordinator::GetStatus() { + lock_guard<mutex> l(lock_); + return query_status_; } -Status Coordinator::SetNonErrorTerminalState(const ExecState state) { - DCHECK(state == ExecState::RETURNED_RESULTS || state == ExecState::CANCELLED); - Status ret_status; - { - lock_guard<SpinLock> l(exec_state_lock_); - // May have already entered a terminal state, in which case nothing to do. - if (exec_state_ != ExecState::EXECUTING) return exec_status_; - DCHECK(exec_status_.ok()) << exec_status_; - exec_state_ = state; - if (state == ExecState::CANCELLED) exec_status_ = Status::CANCELLED; - ret_status = exec_status_; - } - VLOG_QUERY << Substitute("ExecState: query id=$0 execution $1", PrintId(query_id()), - state == ExecState::CANCELLED ? "cancelled" : "completed"); - HandleExecStateTransition(ExecState::EXECUTING, state); - return ret_status; -} - -Status Coordinator::UpdateExecState(const Status& status, - const TUniqueId* failed_finst, const string& instance_hostname) { - Status ret_status; - ExecState old_state, new_state; +Status Coordinator::UpdateStatus(const Status& status, const string& backend_hostname, + bool is_fragment_failure, const TUniqueId& instance_id) { { - lock_guard<SpinLock> l(exec_state_lock_); - old_state = exec_state_; - if (old_state == ExecState::EXECUTING) { - DCHECK(exec_status_.ok()) << exec_status_; - if (!status.ok()) { - // Error while executing - go to ERROR state. - exec_status_ = status; - exec_state_ = ExecState::ERROR; - } - } else if (old_state == ExecState::RETURNED_RESULTS) { - // Already returned all results. Leave exec status as ok, stay in this state. - DCHECK(exec_status_.ok()) << exec_status_; - } else if (old_state == ExecState::CANCELLED) { - // Client requested cancellation already, stay in this state. Ignores errors - // after requested cancellations. - DCHECK(exec_status_.IsCancelled()) << exec_status_; - } else { - // Already in the ERROR state, stay in this state but update status to be the - // first non-cancelled status. - DCHECK_EQ(old_state, ExecState::ERROR); - DCHECK(!exec_status_.ok()); - if (!status.ok() && !status.IsCancelled() && exec_status_.IsCancelled()) { - exec_status_ = status; - } - } - new_state = exec_state_; - ret_status = exec_status_; - } - // Log interesting status: a non-cancelled error or a cancellation if was executing. - if (!status.ok() && (!status.IsCancelled() || old_state == ExecState::EXECUTING)) { - VLOG_QUERY << Substitute( - "ExecState: query id=$0 finstance=$1 on host=$2 ($3 -> $4) status=$5", - PrintId(query_id()), failed_finst != nullptr ? PrintId(*failed_finst) : "N/A", - instance_hostname, ExecStateToString(old_state), ExecStateToString(new_state), - status.GetDetail()); - } - // After dropping the lock, apply the state transition (if any) side-effects. - HandleExecStateTransition(old_state, new_state); - return ret_status; -} - -bool Coordinator::ReturnedAllResults() { - lock_guard<SpinLock> l(exec_state_lock_); - return exec_state_ == ExecState::RETURNED_RESULTS; -} - -void Coordinator::HandleExecStateTransition( - const ExecState old_state, const ExecState new_state) { - static const unordered_map<ExecState, const char *> exec_state_to_event{ - {ExecState::EXECUTING, "Executing"}, - {ExecState::RETURNED_RESULTS, "Last row fetched"}, - {ExecState::CANCELLED, "Execution cancelled"}, - {ExecState::ERROR, "Execution error"}}; - if (old_state == new_state) return; - // Once we enter a terminal state, we stay there, guaranteeing this code runs only once. - DCHECK_EQ(old_state, ExecState::EXECUTING); - // Should never transition to the initial state. - DCHECK_NE(new_state, ExecState::EXECUTING); - - query_events_->MarkEvent(exec_state_to_event.at(new_state)); - // TODO: IMPALA-7011 is this needed? This will also happen on the "backend" side of - // cancel rpc. And in the case of EOS, sink already knows this. - if (coord_sink_ != nullptr) coord_sink_->CloseConsumer(); - // This thread won the race to transitioning into a terminal state - terminate - // execution and release resources. - ReleaseExecResources(); - if (new_state == ExecState::RETURNED_RESULTS) { - // TODO: IMPALA-6984: cancel all backends in this case too. - WaitForBackends(); + lock_guard<mutex> l(lock_); + + // The query is done and we are just waiting for backends to clean up. + // Ignore their cancelled updates. + if (returned_all_results_ && status.IsCancelled()) return query_status_; + + // nothing to update + if (status.ok()) return query_status_; + + // don't override an error status; also, cancellation has already started + if (!query_status_.ok()) return query_status_; + + query_status_ = status; + CancelInternal(); + } + + if (is_fragment_failure) { + // Log the id of the fragment that first failed so we can track it down more easily. + VLOG_QUERY << "query_id=" << PrintId(query_id()) + << " failed because fragment_instance_id=" << PrintId(instance_id) + << " on host=" << backend_hostname << " failed."; } else { - CancelBackends(); + VLOG_QUERY << "query_id=" << PrintId(query_id()) << " failed due to error on host=" + << backend_hostname; } - ReleaseAdmissionControlResources(); - // Can compute summary only after we stop accepting reports from the backends. Both - // WaitForBackends() and CancelBackends() ensures that. - // TODO: should move this off of the query execution path? - ComputeQuerySummary(); + return query_status_; } Status Coordinator::FinalizeHdfsInsert() { @@ -550,7 +491,7 @@ Status Coordinator::FinalizeHdfsInsert() { VLOG_QUERY << "Finalizing query: " << PrintId(query_id()); SCOPED_TIMER(finalization_timer_); - Status return_status = UpdateExecState(Status::OK(), nullptr, FLAGS_hostname); + Status return_status = GetStatus(); if (return_status.ok()) { HdfsTableDescriptor* hdfs_table; RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(query_ctx().desc_tbl, @@ -576,13 +517,22 @@ Status Coordinator::FinalizeHdfsInsert() { return return_status; } -void Coordinator::WaitForBackends() { - int32_t num_remaining = backend_exec_complete_barrier_->pending(); - if (num_remaining > 0) { - VLOG_QUERY << "Coordinator waiting for backends to finish, " << num_remaining - << " remaining. query_id=" << PrintId(query_id()); - backend_exec_complete_barrier_->Wait(); +Status Coordinator::WaitForBackendCompletion() { + unique_lock<mutex> l(lock_); + while (num_remaining_backends_ > 0 && query_status_.ok()) { + VLOG_QUERY << "Coordinator waiting for backends to finish, " + << num_remaining_backends_ << " remaining. query_id=" + << PrintId(query_id()); + backend_completion_cv_.Wait(l); } + if (query_status_.ok()) { + VLOG_QUERY << "All backends finished successfully. query_id=" << PrintId(query_id()); + } else { + VLOG_QUERY << "All backends finished due to one or more errors. query_id=" + << PrintId(query_id()) << ". " << query_status_.GetDetail(); + } + + return query_status_; } Status Coordinator::Wait() { @@ -593,22 +543,34 @@ Status Coordinator::Wait() { if (stmt_type_ == TStmtType::QUERY) { DCHECK(coord_instance_ != nullptr); - return UpdateExecState(coord_instance_->WaitForOpen(), - &coord_instance_->runtime_state()->fragment_instance_id(), FLAGS_hostname); + return UpdateStatus(coord_instance_->WaitForOpen(), FLAGS_hostname, true, + runtime_state()->fragment_instance_id()); } + DCHECK_EQ(stmt_type_, TStmtType::DML); - // DML finalization can only happen when all backends have completed all side-effects - // and reported relevant state. - WaitForBackends(); - if (finalize_params() != nullptr) { - RETURN_IF_ERROR(UpdateExecState( - FinalizeHdfsInsert(), nullptr, FLAGS_hostname)); - } - // DML requests are finished at this point. - RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS)); + // Query finalization can only happen when all backends have reported relevant + // state. They only have relevant state to report in the parallel INSERT case, + // otherwise all the relevant state is from the coordinator fragment which will be + // available after Open() returns. Ignore the returned status if finalization is + // required., since FinalizeHdfsInsert() will pick it up and needs to execute + // regardless. + Status status = WaitForBackendCompletion(); + if (finalize_params() == nullptr && !status.ok()) return status; + + // Execution of query fragments has finished. We don't need to hold onto query execution + // resources while we finalize the query. + ReleaseExecResources(); + // Query finalization is required only for HDFS table sinks + if (finalize_params() != nullptr) RETURN_IF_ERROR(FinalizeHdfsInsert()); + // Release admission control resources after we'd done the potentially heavyweight + // finalization. + ReleaseAdmissionControlResources(); + query_profile_->AddInfoString( "DML Stats", dml_exec_state_.OutputPartitionStats("\n")); - return Status::OK(); + // For DML queries, when Wait is done, the query is complete. + ComputeQuerySummary(); + return status; } Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) { @@ -616,54 +578,88 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) { DCHECK(has_called_wait_); SCOPED_TIMER(query_profile_->total_time_counter()); - // exec_state_lock_ not needed here though since this path won't execute concurrently - // with itself or DML finalization. - if (exec_state_ == ExecState::RETURNED_RESULTS) { - // Nothing left to do: already in a terminal state and no more results. + if (returned_all_results_) { + // May be called after the first time we set *eos. Re-set *eos and return here; + // already torn-down coord_sink_ so no more work to do. *eos = true; return Status::OK(); } - DCHECK(coord_instance_ != nullptr) << "Exec() should be called first"; - DCHECK(coord_sink_ != nullptr) << "Exec() should be called first"; - RuntimeState* runtime_state = coord_instance_->runtime_state(); - Status status = coord_sink_->GetNext(runtime_state, results, max_rows, eos); - RETURN_IF_ERROR(UpdateExecState( - status, &runtime_state->fragment_instance_id(), FLAGS_hostname)); - if (*eos) RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS)); + DCHECK(coord_sink_ != nullptr) + << "GetNext() called without result sink. Perhaps Prepare() failed and was not " + << "checked?"; + Status status = coord_sink_->GetNext(runtime_state(), results, max_rows, eos); + + // if there was an error, we need to return the query's error status rather than + // the status we just got back from the local executor (which may well be CANCELLED + // in that case). Coordinator fragment failed in this case so we log the query_id. + RETURN_IF_ERROR(UpdateStatus(status, FLAGS_hostname, true, + runtime_state()->fragment_instance_id())); + + if (*eos) { + returned_all_results_ = true; + query_events_->MarkEvent("Last row fetched"); + // release query execution resources here, since we won't be fetching more result rows + ReleaseExecResources(); + // wait for all backends to complete before computing the summary + // TODO: relocate this so GetNext() won't have to wait for backends to complete? + RETURN_IF_ERROR(WaitForBackendCompletion()); + // Release admission control resources after backends are finished. + ReleaseAdmissionControlResources(); + // if the query completed successfully, compute the summary + if (query_status_.ok()) ComputeQuerySummary(); + } + return Status::OK(); } -void Coordinator::Cancel() { - // Illegal to call Cancel() before Exec() returns, so there's no danger of the cancel - // RPC passing the exec RPC. - DCHECK(exec_rpcs_complete_barrier_ != nullptr && - exec_rpcs_complete_barrier_->pending() <= 0) << "Exec() must be called first"; - discard_result(SetNonErrorTerminalState(ExecState::CANCELLED)); +void Coordinator::Cancel(const Status* cause) { + lock_guard<mutex> l(lock_); + // if the query status indicates an error, cancellation has already been initiated; + // prevent others from cancelling a second time + if (!query_status_.ok()) return; + + // TODO: This should default to OK(), not CANCELLED if there is no cause (or callers + // should explicitly pass Status::OK()). Fragment instances may be cancelled at the end + // of a successful query. Need to clean up relationship between query_status_ here and + // in QueryExecState. See IMPALA-4279. + query_status_ = (cause != nullptr && !cause->ok()) ? *cause : Status::CANCELLED; + CancelInternal(); } -void Coordinator::CancelBackends() { +void Coordinator::CancelInternal() { + VLOG_QUERY << "Cancel() query_id=" << PrintId(query_id()); + // TODO: remove when restructuring cancellation, which should happen automatically + // as soon as the coordinator knows that the query is finished + DCHECK(!query_status_.ok()); + int num_cancelled = 0; for (BackendState* backend_state: backend_states_) { DCHECK(backend_state != nullptr); if (backend_state->Cancel()) ++num_cancelled; } - backend_exec_complete_barrier_->NotifyRemaining(); - VLOG_QUERY << Substitute( "CancelBackends() query_id=$0, tried to cancel $1 backends", PrintId(query_id()), num_cancelled); + backend_completion_cv_.NotifyAll(); + + ReleaseExecResourcesLocked(); + ReleaseAdmissionControlResourcesLocked(); + // Report the summary with whatever progress the query made before being cancelled. + ComputeQuerySummary(); } Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& params) { - VLOG_FILE << "UpdateBackendExecStatus() query_id=" << PrintId(query_id()) - << " backend_idx=" << params.coord_state_idx; + VLOG_FILE << "UpdateBackendExecStatus() backend_idx=" << params.coord_state_idx; if (params.coord_state_idx >= backend_states_.size()) { return Status(TErrorCode::INTERNAL_ERROR, Substitute("Unknown backend index $0 (max known: $1)", params.coord_state_idx, backend_states_.size() - 1)); } BackendState* backend_state = backend_states_[params.coord_state_idx]; + // TODO: return here if returned_all_results_? + // TODO: return CANCELLED in that case? Although that makes the cancellation propagation + // path more irregular. // TODO: only do this when the sink is done; probably missing a done field // in TReportExecStatus for that @@ -672,30 +668,46 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param } if (backend_state->ApplyExecStatusReport(params, &exec_summary_, &progress_)) { - // This backend execution has completed. + // This report made this backend done, so update the status and + // num_remaining_backends_. + + // for now, abort the query if we see any error except if returned_all_results_ is + // true (UpdateStatus() initiates cancellation, if it hasn't already been) + // TODO: clarify control flow here, it's unclear we should even process this status + // report if returned_all_results_ is true bool is_fragment_failure; TUniqueId failed_instance_id; Status status = backend_state->GetStatus(&is_fragment_failure, &failed_instance_id); - int pending_backends = backend_exec_complete_barrier_->Notify(); - if (VLOG_QUERY_IS_ON && pending_backends >= 0) { - VLOG_QUERY << "Backend completed:" - << " host=" << TNetworkAddressToString(backend_state->impalad_address()) - << " remaining=" << pending_backends - << " query_id=" << PrintId(query_id()); + if (!status.ok() && !returned_all_results_) { + Status ignored = + UpdateStatus(status, TNetworkAddressToString(backend_state->impalad_address()), + is_fragment_failure, failed_instance_id); + return Status::OK(); + } + + lock_guard<mutex> l(lock_); + DCHECK_GT(num_remaining_backends_, 0); + if (VLOG_QUERY_IS_ON && num_remaining_backends_ > 1) { + VLOG_QUERY << "Backend completed: " + << " host=" << TNetworkAddressToString(backend_state->impalad_address()) + << " remaining=" << num_remaining_backends_ - 1 + << " query_id=" << PrintId(query_id()); BackendState::LogFirstInProgress(backend_states_); } - if (!status.ok()) { - // TODO: IMPALA-5119: call UpdateExecState() asynchronously rather than - // from within this RPC handler (since it can make RPCs). - discard_result(UpdateExecState(status, - is_fragment_failure ? &failed_instance_id : nullptr, - TNetworkAddressToString(backend_state->impalad_address()))); + if (--num_remaining_backends_ == 0 || !status.ok()) { + backend_completion_cv_.NotifyAll(); } + return Status::OK(); } // If all results have been returned, return a cancelled status to force the fragment // instance to stop executing. - // TODO: Make returning CANCELLED unnecessary with IMPALA-6984. - return ReturnedAllResults() ? Status::CANCELLED : Status::OK(); + if (returned_all_results_) return Status::CANCELLED; + + return Status::OK(); +} + +RuntimeState* Coordinator::runtime_state() { + return coord_instance_ == nullptr ? nullptr : coord_instance_->runtime_state(); } // TODO: add histogram/percentile @@ -728,14 +740,20 @@ void Coordinator::ComputeQuerySummary() { string Coordinator::GetErrorLog() { ErrorLogMap merged; - { - lock_guard<SpinLock> l(backend_states_init_lock_); - for (BackendState* state: backend_states_) state->MergeErrorLog(&merged); + for (BackendState* state: backend_states_) { + state->MergeErrorLog(&merged); } return PrintErrorMapToString(merged); } void Coordinator::ReleaseExecResources() { + lock_guard<mutex> l(lock_); + ReleaseExecResourcesLocked(); +} + +void Coordinator::ReleaseExecResourcesLocked() { + if (released_exec_resources_) return; + released_exec_resources_ = true; if (filter_routing_table_.size() > 0) { query_profile_->AddInfoString("Final filter table", FilterDebugString()); } @@ -749,6 +767,8 @@ void Coordinator::ReleaseExecResources() { } // This may be NULL while executing UDFs. if (filter_mem_tracker_ != nullptr) filter_mem_tracker_->Close(); + // Need to protect against failed Prepare(), where root_sink() would not be set. + if (coord_sink_ != nullptr) coord_sink_->CloseConsumer(); // Now that we've released our own resources, can release query-wide resources. if (query_state_ != nullptr) query_state_->ReleaseExecResourceRefcount(); // At this point some tracked memory may still be used in the coordinator for result @@ -756,21 +776,28 @@ void Coordinator::ReleaseExecResources() { } void Coordinator::ReleaseAdmissionControlResources() { - LOG(INFO) << "Release admission control resources for query_id=" << PrintId(query_id()); + lock_guard<mutex> l(lock_); + ReleaseAdmissionControlResourcesLocked(); +} + +void Coordinator::ReleaseAdmissionControlResourcesLocked() { + if (released_admission_control_resources_) return; + LOG(INFO) << "Release admission control resources for query_id=" + << PrintId(query_ctx().query_id); AdmissionController* admission_controller = ExecEnv::GetInstance()->admission_controller(); DCHECK(admission_controller != nullptr); admission_controller->ReleaseQuery(schedule_); + released_admission_control_resources_ = true; query_events_->MarkEvent("Released admission control resources"); } void Coordinator::UpdateFilter(const TUpdateFilterParams& params) { DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF) << "UpdateFilter() called although runtime filters are disabled"; - DCHECK(exec_rpcs_complete_barrier_.get() != nullptr) + DCHECK(exec_complete_barrier_.get() != nullptr) << "Filters received before fragments started!"; - - exec_rpcs_complete_barrier_->Wait(); + exec_complete_barrier_->Wait(); DCHECK(filter_routing_table_complete_) << "Filter received before routing table complete"; @@ -841,7 +868,6 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) { rpc_params.__set_dst_query_id(query_id()); rpc_params.__set_filter_id(params.filter_id); - // Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid. for (BackendState* bs: backend_states_) { for (int fragment_idx: target_fragment_idxs) { rpc_params.__set_dst_fragment_idx(fragment_idx); @@ -915,19 +941,23 @@ void Coordinator::FilterState::Disable(MemTracker* tracker) { } } +const TUniqueId& Coordinator::query_id() const { + return query_ctx().query_id; +} + void Coordinator::GetTExecSummary(TExecSummary* exec_summary) { lock_guard<SpinLock> l(exec_summary_.lock); *exec_summary = exec_summary_.thrift_exec_summary; } MemTracker* Coordinator::query_mem_tracker() const { - return query_state_->query_mem_tracker(); + return query_state()->query_mem_tracker(); } void Coordinator::BackendsToJson(Document* doc) { Value states(kArrayType); { - lock_guard<SpinLock> l(backend_states_init_lock_); + lock_guard<mutex> l(lock_); for (BackendState* state : backend_states_) { Value val(kObjectType); state->ToJson(&val, doc); @@ -940,7 +970,7 @@ void Coordinator::BackendsToJson(Document* doc) { void Coordinator::FInstanceStatsToJson(Document* doc) { Value states(kArrayType); { - lock_guard<SpinLock> l(backend_states_init_lock_); + lock_guard<mutex> l(lock_); for (BackendState* state : backend_states_) { Value val(kObjectType); state->InstanceStatsToJson(&val, doc); @@ -950,14 +980,6 @@ void Coordinator::FInstanceStatsToJson(Document* doc) { doc->AddMember("backend_instances", states, doc->GetAllocator()); } -const TQueryCtx& Coordinator::query_ctx() const { - return schedule_.request().query_ctx; -} - -const TUniqueId& Coordinator::query_id() const { - return query_ctx().query_id; -} - const TFinalizeParams* Coordinator::finalize_params() const { return schedule_.request().__isset.finalize_params ? &schedule_.request().finalize_params : nullptr; http://git-wip-us.apache.org/repos/asf/impala/blob/4fab4288/be/src/runtime/coordinator.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h index 36c9f26..723047b 100644 --- a/be/src/runtime/coordinator.h +++ b/be/src/runtime/coordinator.h @@ -20,15 +20,26 @@ #include <string> #include <vector> +#include <boost/accumulators/accumulators.hpp> +#include <boost/accumulators/statistics/max.hpp> +#include <boost/accumulators/statistics/mean.hpp> +#include <boost/accumulators/statistics/median.hpp> +#include <boost/accumulators/statistics/min.hpp> +#include <boost/accumulators/statistics/stats.hpp> +#include <boost/accumulators/statistics/variance.hpp> #include <boost/scoped_ptr.hpp> #include <boost/unordered_map.hpp> +#include <boost/unordered_set.hpp> #include <rapidjson/document.h> #include "common/global-types.h" +#include "common/hdfs.h" #include "common/status.h" #include "gen-cpp/Frontend_types.h" #include "gen-cpp/Types_types.h" #include "runtime/dml-exec-state.h" +#include "runtime/query-state.h" +#include "scheduling/query-schedule.h" #include "util/condition-variable.h" #include "util/progress-updater.h" #include "util/runtime-profile-counters.h" @@ -45,7 +56,6 @@ class TPlanExecRequest; class TRuntimeProfileTree; class RuntimeProfile; class QueryResultSet; -class QuerySchedule; class MemTracker; class PlanRootSink; class FragmentInstanceState; @@ -55,9 +65,10 @@ class QueryState; /// Query coordinator: handles execution of fragment instances on remote nodes, given a /// TQueryExecRequest. As part of that, it handles all interactions with the executing /// backends; it is also responsible for implementing all client requests regarding the -/// query, including cancellation. Once a query ends, either by returning EOS, through -/// client cancellation, returning an error, or by finalizing a DML request, the -/// coordinator releases resources. +/// query, including cancellation. Once a query ends, either through cancellation or +/// by returning eos, the coordinator releases resources. (Note that DML requests +/// always end with cancellation, via ImpalaServer::UnregisterQuery()/ +/// ImpalaServer::CancelInternal()/ClientRequestState::Cancel().) /// /// The coordinator monitors the execution status of fragment instances and aborts the /// entire query if an error is reported by any of them. @@ -66,80 +77,80 @@ class QueryState; /// rows are produced by a fragment instance that always executes on the same machine as /// the coordinator. /// -/// Thread-safe except where noted. -/// +/// Thread-safe, with the exception of GetNext(). +// /// A typical sequence of calls for a single query (calls under the same numbered /// item can happen concurrently): /// 1. client: Exec() /// 2. client: Wait()/client: Cancel()/backend: UpdateBackendExecStatus() /// 3. client: GetNext()*/client: Cancel()/backend: UpdateBackendExecStatus() /// -/// A query is considered to be executing until one of three things occurs: -/// 1. An error is encountered. Backend cancellation is automatically initiated for all -/// backends that haven't yet completed and the overall query status is set to the -/// first (non-cancelled) encountered error status. -/// 2. The query is cancelled via an explicit Cancel() call. The overall query status -/// is set to CANCELLED and cancellation is initiated for all backends still -/// executing (without an error status). -/// 3. The query has returned all rows. The overall query status is OK (and remains -/// OK). Client cancellation is no longer possible and subsequent backend errors are -/// ignored. (TODO: IMPALA-6984 initiate backend cancellation in this case). -/// -/// Lifecycle: this object must not be destroyed until after one of the three states -/// above is reached (error, cancelled, or EOS) to ensure resources are released. -/// -/// Lock ordering: (lower-numbered acquired before higher-numbered) -/// 1. wait_lock_ -/// 2. exec_state_lock_, backend_states_init_lock_, filter_lock_, -/// ExecSummary::lock (leafs) +/// The implementation ensures that setting an overall error status and initiating +/// cancellation of all fragment instances is atomic. /// /// TODO: move into separate subdirectory and move nested classes into separate files /// and unnest them +/// TODO: clean up locking behavior; in particular, clarify dependency on lock_ +/// TODO: clarify cancellation path; in particular, cancel as soon as we return +/// all results class Coordinator { // NOLINT: The member variables could be re-ordered to save space public: Coordinator(const QuerySchedule& schedule, RuntimeProfile::EventSequence* events); ~Coordinator(); - /// Initiate asynchronous execution of a query with the given schedule. When it - /// returns, all fragment instances have started executing at their respective - /// backends. Exec() must be called exactly once and a call to Exec() must precede - /// all other member function calls. + /// Initiate asynchronous execution of a query with the given schedule. When it returns, + /// all fragment instances have started executing at their respective backends. + /// A call to Exec() must precede all other member function calls. Status Exec() WARN_UNUSED_RESULT; /// Blocks until result rows are ready to be retrieved via GetNext(), or, if the - /// query doesn't return rows, until the query finishes or is cancelled. A call to - /// Wait() must precede all calls to GetNext(). Multiple calls to Wait() are - /// idempotent and it is okay to issue multiple Wait() calls concurrently. + /// query doesn't return rows, until the query finishes or is cancelled. + /// A call to Wait() must precede all calls to GetNext(). + /// Multiple calls to Wait() are idempotent and it is okay to issue multiple + /// Wait() calls concurrently. Status Wait() WARN_UNUSED_RESULT; /// Fills 'results' with up to 'max_rows' rows. May return fewer than 'max_rows' - /// rows, but will not return more. If *eos is true, all rows have been returned. - /// Returns a non-OK status if an error was encountered either locally or by any of - /// the executing backends, or if the query was cancelled via Cancel(). After *eos - /// is true, subsequent calls to GetNext() will be a no-op. + /// rows, but will not return more. + /// + /// If *eos is true, execution has completed. Subsequent calls to GetNext() will be a + /// no-op. + /// + /// GetNext() will not set *eos=true until all fragment instances have either completed + /// or have failed. + /// + /// Returns an error status if an error was encountered either locally or by any of the + /// remote fragments or if the query was cancelled. /// /// GetNext() is not thread-safe: multiple threads must not make concurrent GetNext() - /// calls. + /// calls (but may call any of the other member functions concurrently with GetNext()). Status GetNext(QueryResultSet* results, int max_rows, bool* eos) WARN_UNUSED_RESULT; - /// Cancel execution of query and sets the overall query status to CANCELLED if the - /// query is still executing. Idempotent. - void Cancel(); + /// Cancel execution of query. This includes the execution of the local plan fragment, + /// if any, as well as all plan fragments on remote nodes. Sets query_status_ to the + /// given cause if non-NULL. Otherwise, sets query_status_ to Status::CANCELLED. + /// Idempotent. + void Cancel(const Status* cause = nullptr); - /// Called by the report status RPC handler to update execution status of a - /// particular backend as well as dml_exec_state_ and the profile. + /// Updates execution status of a particular backend as well as dml_exec_state_. + /// Also updates num_remaining_backends_ and cancels execution if the backend has an + /// error status. Status UpdateBackendExecStatus(const TReportExecStatusParams& params) WARN_UNUSED_RESULT; + /// Only valid to call after Exec(). + QueryState* query_state() const { return query_state_; } + /// Get cumulative profile aggregated over all fragments of the query. /// This is a snapshot of the current state of execution and will change in /// the future if not all fragments have finished execution. RuntimeProfile* query_profile() const { return query_profile_; } - /// Safe to call only after Exec(). + const TUniqueId& query_id() const; + MemTracker* query_mem_tracker() const; - /// Safe to call only after Wait(). + /// This is safe to call only after Wait() DmlExecState* dml_exec_state() { return &dml_exec_state_; } /// Return error log for coord and all the fragments. The error messages from the @@ -148,6 +159,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save const ProgressUpdater& progress() { return progress_; } + /// Returns query_status_. + Status GetStatus(); + /// Get a copy of the current exec summary. Thread-safe. void GetTExecSummary(TExecSummary* exec_summary); @@ -174,20 +188,18 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// owned by the ClientRequestState that owns this coordinator const QuerySchedule& schedule_; - /// Copied from TQueryExecRequest, governs when finalization occurs. Set in Exec(). + /// copied from TQueryExecRequest, governs when to call ReportQuerySummary TStmtType::type stmt_type_; - /// BackendStates for all execution backends, including the coordinator. All elements - /// are non-nullptr and owned by obj_pool(). Populated by Exec()/InitBackendStates(). + /// BackendStates for all execution backends, including the coordinator. + /// All elements are non-nullptr. Owned by obj_pool(). Populated by + /// InitBackendExec(). std::vector<BackendState*> backend_states_; - /// Protects the population of backend_states_ vector (not the BackendState objects). - /// Used when accessing backend_states_ if it's not guaranteed that - /// InitBackendStates() has completed. - SpinLock backend_states_init_lock_; + // index into backend_states_ for coordinator fragment; -1 if no coordinator fragment + int coord_backend_idx_ = -1; - /// The QueryState for this coordinator. Reference taken in Exec(). Reference - /// released in destructor. + /// The QueryState for this coordinator. Set in Exec(). Released in TearDown(). QueryState* query_state_ = nullptr; /// Non-null if and only if the query produces results for the client; i.e. is of @@ -198,10 +210,12 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// Result rows are materialized by this fragment instance in its own thread. They are /// materialized into a QueryResultSet provided to the coordinator during GetNext(). /// - /// Owned by the QueryState. Set in Exec(). + /// Not owned by this class. Set in Exec(). Reset to nullptr (and the implied + /// reference of QueryState released) in TearDown(). FragmentInstanceState* coord_instance_ = nullptr; - /// Owned by the QueryState. Set in Exec(). + /// Not owned by this class. Set in Exec(). Reset to nullptr in TearDown() or when + /// GetNext() hits eos. PlanRootSink* coord_sink_ = nullptr; /// ensures single-threaded execution of Wait(). See lock ordering class comment. @@ -209,17 +223,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save bool has_called_wait_ = false; // if true, Wait() was called; protected by wait_lock_ - /// Keeps track of number of completed ranges and total scan ranges. Initialized by - /// Exec(). + /// Keeps track of number of completed ranges and total scan ranges. ProgressUpdater progress_; - /// Aggregate counters for the entire query. Lives in 'obj_pool_'. Set in Exec(). - RuntimeProfile* query_profile_ = nullptr; - - /// Total time spent in finalization (typically 0 except for INSERT into hdfs - /// tables). Set in Exec(). - RuntimeProfile::Counter* finalization_timer_ = nullptr; - /// Total number of filter updates received (always 0 if filter mode is not /// GLOBAL). Excludes repeated broadcast filter updates. Set in Exec(). RuntimeProfile::Counter* filter_updates_received_ = nullptr; @@ -250,7 +256,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save void Init(const QuerySchedule& query_schedule); }; - // Initialized by Exec(). ExecSummary exec_summary_; /// Filled in as the query completes and tracks the results of DML queries. This is @@ -258,40 +263,52 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// coordinator fragment: only one of the two can legitimately produce updates. DmlExecState dml_exec_state_; + /// Aggregate counters for the entire query. Lives in 'obj_pool_'. + RuntimeProfile* query_profile_ = nullptr; + + /// Protects all fields below. This is held while making RPCs, so this lock should + /// only be acquired if the acquiring thread is prepared to wait for a significant + /// time. + /// TODO: clarify to what extent the fields below need to be protected by lock_ + /// Lock ordering is + /// 1. wait_lock_ + /// 2. lock_ + /// 3. BackendState::lock_ + /// 4. filter_lock_ + boost::mutex lock_; + + /// Overall status of the entire query; set to the first reported fragment error + /// status or to CANCELLED, if Cancel() is called. + Status query_status_; + + /// If true, the query is done returning all results. It is possible that the + /// coordinator still needs to wait for cleanup on remote fragments (e.g. queries + /// with limit) + /// Once this is set to true, errors from execution backends are ignored. + bool returned_all_results_ = false; + + /// If there is no coordinator fragment, Wait() simply waits until all + /// backends report completion by notifying on backend_completion_cv_. + /// Tied to lock_. + ConditionVariable backend_completion_cv_; + + /// Count of the number of backends for which done != true. When this + /// hits 0, any Wait()'ing thread is notified + int num_remaining_backends_ = 0; + /// Event timeline for this query. Not owned. RuntimeProfile::EventSequence* query_events_ = nullptr; - /// Indexed by fragment idx (TPlanFragment.idx). Filled in - /// Exec()/InitFragmentStats(), elements live in obj_pool(). Updated by BackendState - /// sequentially, without synchronization. + /// Indexed by fragment idx (TPlanFragment.idx). Filled in InitFragmentStats(), + /// elements live in obj_pool(). std::vector<FragmentStats*> fragment_stats_; - /// Barrier that is released when all calls to BackendState::Exec() have - /// returned. Initialized in StartBackendExec(). - boost::scoped_ptr<CountingBarrier> exec_rpcs_complete_barrier_; - - /// Barrier that is released when all backends have indicated execution completion, - /// or when all backends are cancelled due to an execution error or client requested - /// cancellation. Initialized in StartBackendExec(). - boost::scoped_ptr<CountingBarrier> backend_exec_complete_barrier_; - - SpinLock exec_state_lock_; // protects exec-state_ and exec_status_ - - /// EXECUTING: in-flight; the only non-terminal state - /// RETURNED_RESULTS: GetNext() set eos to true, or for DML, the request is complete - /// CANCELLED: Cancel() was called (not: someone called CancelBackends()) - /// ERROR: received an error from a backend - enum class ExecState { - EXECUTING, RETURNED_RESULTS, CANCELLED, ERROR - }; - ExecState exec_state_ = ExecState::EXECUTING; + /// total time spent in finalization (typically 0 except for INSERT into hdfs tables) + RuntimeProfile::Counter* finalization_timer_ = nullptr; - /// Overall execution status; only set on exec_state_ transitions: - /// - EXECUTING: OK - /// - RETURNED_RESULTS: OK - /// - CANCELLED: CANCELLED - /// - ERROR: error status - Status exec_status_; + /// Barrier that is released when all calls to ExecRemoteFragment() have + /// returned, successfully or not. Initialised during Exec(). + boost::scoped_ptr<CountingBarrier> exec_complete_barrier_; /// Protects filter_routing_table_. SpinLock filter_lock_; @@ -304,6 +321,12 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// safe to concurrently read from filter_routing_table_. bool filter_routing_table_complete_ = false; + /// True if and only if ReleaseExecResources() has been called. + bool released_exec_resources_ = false; + + /// True if and only if ReleaseAdmissionControlResources() has been called. + bool released_admission_control_resources_ = false; + /// Returns a local object pool. ObjectPool* obj_pool() { return obj_pool_.get(); } @@ -311,67 +334,36 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// HDFS INSERT finalization is not required. const TFinalizeParams* finalize_params() const; - const TQueryCtx& query_ctx() const; + const TQueryCtx& query_ctx() const { return schedule_.request().query_ctx; } - const TUniqueId& query_id() const; + /// Only valid *after* calling Exec(). Return nullptr if the running query does not + /// produce any rows. + RuntimeState* runtime_state(); /// Returns a pretty-printed table of the current filter state. std::string FilterDebugString(); - /// Called when the query is done executing due to reaching EOS or client - /// cancellation. If 'exec_state_' != EXECUTING, does nothing. Otherwise sets - /// 'exec_state_' to 'state' (must be either CANCELLED or RETURNED_RESULTS), and - /// finalizes execution (cancels remaining backends if transitioning to CANCELLED; - /// either way, calls ComputeQuerySummary() and releases resources). Returns the - /// resulting overall execution status. - Status SetNonErrorTerminalState(const ExecState state) WARN_UNUSED_RESULT; - - /// Transitions 'exec_state_' given an execution status and returns the resulting - /// overall status: - /// - /// - if the 'status' parameter is ok, no state transition - /// - if 'exec_state_' is EXECUTING and 'status' is not ok, transitions to ERROR - /// - if 'exec_state_' is already RETURNED_RESULTS, CANCELLED, or ERROR: does not - /// transition state (those are terminal states) however in the case of ERROR, - /// status may be updated to a more interesting status. - /// - /// Should not be called for (client initiated) cancellation. Call - /// SetNonErrorTerminalState(CANCELLED) instead. - /// - /// 'failed_finstance' is the fragment instance id that has failed (or nullptr if the - /// failure is not specific to a fragment instance), used for error reporting along - /// with 'instance_hostname'. - Status UpdateExecState(const Status& status, const TUniqueId* failed_finstance, - const string& instance_hostname) WARN_UNUSED_RESULT; - - /// Helper for SetNonErrorTerminalState() and UpdateExecStateIfError(). If the caller - /// transitioned to a terminal state (which happens exactly once for the lifetime of - /// the Coordinator object), then finalizes execution (cancels remaining backends if - /// transitioning to CANCELLED; in all cases releases resources and calls - /// ComputeQuerySummary(). - void HandleExecStateTransition(const ExecState old_state, const ExecState new_state); - - /// Return true if 'exec_state_' is RETURNED_RESULTS. - /// TODO: remove with IMPALA-6984. - bool ReturnedAllResults() WARN_UNUSED_RESULT; - - /// Return the string representation of 'state'. - static const char* ExecStateToString(const ExecState state); - - // For DCHECK_EQ, etc of ExecState values. - friend std::ostream& operator<<(std::ostream& o, const ExecState s) { - return o << ExecStateToString(s); - } - - /// Helper for HandleExecStateTransition(). Sends cancellation request to all - /// executing backends but does not wait for acknowledgement from the backends. The - /// ExecState state-machine ensures this is called at most once. - void CancelBackends(); - - /// Returns only when either all execution backends have reported success or a request - /// to cancel the backends has already been sent. It is safe to call this concurrently, - /// but any calls must be made only after Exec(). - void WaitForBackends(); + /// Cancels all fragment instances. Assumes that lock_ is held. This may be called when + /// the query is not being cancelled in the case where the query limit is reached. + void CancelInternal(); + + /// Acquires lock_ and updates query_status_ with 'status' if it's not already + /// an error status, and returns the current query_status_. The status may be + /// due to an error in a specific fragment instance, or it can be a general error + /// not tied to a specific fragment instance. + /// Calls CancelInternal() when switching to an error status. + /// When an error is due to a specific fragment instance, 'is_fragment_failure' must + /// be true and 'failed_fragment' is the fragment_id that has failed, used for error + /// reporting. For a general error not tied to a specific instance, + /// 'is_fragment_failure' must be false and 'failed_fragment' will be ignored. + /// 'backend_hostname' is used for error reporting in either case. + Status UpdateStatus(const Status& status, const std::string& backend_hostname, + bool is_fragment_failure, const TUniqueId& failed_fragment) WARN_UNUSED_RESULT; + + /// Returns only when either all execution backends have reported success or the query + /// is in error. Returns the status of the query. + /// It is safe to call this concurrently, but any calls must be made only after Exec(). + Status WaitForBackendCompletion() WARN_UNUSED_RESULT; /// Initializes fragment_stats_ and query_profile_. Must be called before /// InitBackendStates(). @@ -393,33 +385,36 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// finishing the INSERT in flight. Status FinalizeHdfsInsert() WARN_UNUSED_RESULT; - /// Helper for Exec(). Populates backend_states_, starts query execution at all - /// backends in parallel, and blocks until startup completes. + /// Populates backend_states_, starts query execution at all backends in parallel, and + /// blocks until startup completes. void StartBackendExec(); - /// Helper for Exec(). Checks for errors encountered when starting backend execution, - /// using any non-OK status, if any, as the overall status. Returns the overall - /// status. Also updates query_profile_ with the startup latency histogram. + /// Calls CancelInternal() and returns an error if there was any error starting + /// backend execution. + /// Also updates query_profile_ with the startup latency histogram. Status FinishBackendStartup() WARN_UNUSED_RESULT; /// Build the filter routing table by iterating over all plan nodes and collecting the /// filters that they either produce or consume. void InitFilterRoutingTable(); - /// Helper for HandleExecStateTransition(). Releases all resources associated with - /// query execution. The ExecState state-machine ensures this is called exactly once. + /// Releases all resources associated with query execution. Acquires lock_. Idempotent. void ReleaseExecResources(); - /// Helper for HandleExecStateTransition(). Releases admission control resources for - /// use by other queries. This should only be called if one of following - /// preconditions is satisfied for each backend on which the query is executing: - /// - /// * The backend finished execution. Rationale: the backend isn't consuming - /// resources. + /// Same as ReleaseExecResources() except the lock must be held by the caller. + void ReleaseExecResourcesLocked(); + + /// Releases admission control resources for use by other queries. + /// This should only be called if one of following preconditions is satisfied for each + /// backend on which the query is executing: + /// * The backend finished execution. + /// Rationale: the backend isn't consuming resources. + // /// * A cancellation RPC was delivered to the backend. /// Rationale: the backend will be cancelled and release resources soon. By the /// time a newly admitted query fragment starts up on the backend and starts consuming /// resources, the resources from this query will probably have been released. + // /// * Sending the cancellation RPC to the backend failed /// Rationale: the backend is either down or will tear itself down when it next tries /// to send a status RPC to the coordinator. It's possible that the fragment will be @@ -430,13 +425,16 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// pessimistically queueing queries while we wait for a response from a backend that /// may never come. /// - /// Calling WaitForBackends() or CancelBackends() before this function is sufficient - /// to satisfy the above preconditions. If the query has an expensive finalization - /// step post query execution (e.g. a DML statement), then this should be called - /// after that completes to avoid over-admitting queries. + /// Calling WaitForBackendCompletion() or CancelInternal() before this function is + /// sufficient to satisfy the above preconditions. If the query has an expensive + /// finalization step post query execution (e.g. a DML statement), then this should + /// be called after that completes to avoid over-admitting queries. /// - /// The ExecState state-machine ensures this is called exactly once. + /// Acquires lock_. Idempotent. void ReleaseAdmissionControlResources(); + + /// Same as ReleaseAdmissionControlResources() except lock must be held by caller. + void ReleaseAdmissionControlResourcesLocked(); }; } http://git-wip-us.apache.org/repos/asf/impala/blob/4fab4288/be/src/service/client-request-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index 64cf219..5186c6f 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -904,7 +904,7 @@ Status ClientRequestState::Cancel(bool check_inflight, const Status* cause) { // Cancel the parent query. 'lock_' should not be held because cancellation involves // RPCs and can block for a long time. - if (coord != NULL) coord->Cancel(); + if (coord != NULL) coord->Cancel(cause); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/impala/blob/4fab4288/be/src/service/impala-server.h ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index 3af4c9b..fb3f261 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -111,6 +111,11 @@ class ClientRequestState; /// 6. ClientRequestState::expiration_data_lock_ /// 7. Coordinator::exec_summary_lock /// +/// Coordinator::lock_ should not be acquired at the same time as the +/// ImpalaServer/SessionState/ClientRequestState locks. Aside from +/// Coordinator::exec_summary_lock_ the Coordinator's lock ordering is independent of +/// the above lock ordering. +/// /// The following locks are not held in conjunction with other locks: /// * query_log_lock_ /// * session_timeout_lock_ http://git-wip-us.apache.org/repos/asf/impala/blob/4fab4288/be/src/util/counting-barrier.h ---------------------------------------------------------------------- diff --git a/be/src/util/counting-barrier.h b/be/src/util/counting-barrier.h index 827c526..49b0bde 100644 --- a/be/src/util/counting-barrier.h +++ b/be/src/util/counting-barrier.h @@ -33,23 +33,8 @@ class CountingBarrier { } /// Sends one notification, decrementing the number of pending notifications by one. - /// Returns the remaining pending notifications. - int32_t Notify() { - int32_t result = count_.Add(-1); - if (result == 0) promise_.Set(true); - return result; - } - - /// Sets the number of pending notifications to 0 and unblocks Wait(). - void NotifyRemaining() { - while (true) { - int32_t value = count_.Load(); - if (value <= 0) return; // count_ can legitimately drop below 0 - if (count_.CompareAndSwap(value, 0)) { - promise_.Set(true); - return; - } - } + void Notify() { + if (count_.Add(-1) == 0) promise_.Set(true); } /// Blocks until all notifications are received. @@ -59,8 +44,6 @@ class CountingBarrier { /// case '*timed_out' will be true. void Wait(int64_t timeout_ms, bool* timed_out) { promise_.Get(timeout_ms, timed_out); } - int32_t pending() const { return count_.Load(); } - private: /// Used to signal waiters when all notifications are received. Promise<bool> promise_;
