Repository: incubator-impala Updated Branches: refs/heads/master 545eab6d6 -> e993b9712
IMPALA-5892: Allow reporting status independent of fragment instance Queries can hit an error that is not specific to a particular fragment instance. For example, QueryState::StartFInstances() calls DescriptorTbl::Create() before any fragment instances start. This location has no reason to report status via a particular fragment, and there is currently no way to report status otherwise. This leads to a query hang, because the status is never propagated back to the coordinator. This adds the ability to report status that is not associated with a particular fragment instance. By reporting status, the coordinator will now correctly abort the query in the case of the QueryState::StartFInstances() scenario described. Change-Id: I4cd98022f1d62a999c7c80ff5474fa8d069eb12c Reviewed-on: http://gerrit.cloudera.org:8080/7943 Reviewed-by: Lars Volker <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/91f7bc19 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/91f7bc19 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/91f7bc19 Branch: refs/heads/master Commit: 91f7bc1947c1800e689fee040d4820fd8dbf94e4 Parents: 545eab6 Author: Joe McDonnell <[email protected]> Authored: Fri Sep 1 14:50:45 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed Sep 6 21:26:24 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/coordinator-backend-state.cc | 14 ++++++++++- be/src/runtime/coordinator-backend-state.h | 21 +++++++++++++--- be/src/runtime/coordinator.cc | 32 ++++++++++++++---------- be/src/runtime/coordinator.h | 15 +++++++---- be/src/runtime/query-state.cc | 1 + common/thrift/ImpalaInternalService.thrift | 12 +++++++++ 6 files changed, 73 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/91f7bc19/be/src/runtime/coordinator-backend-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc index 34e0671..1b7fd20 100644 --- a/be/src/runtime/coordinator-backend-state.cc +++ b/be/src/runtime/coordinator-backend-state.cc @@ -211,9 +211,12 @@ void Coordinator::BackendState::Exec( VLOG_FILE << "rpc succeeded: ExecQueryFInstances query_id=" << PrintId(query_id_); } -Status Coordinator::BackendState::GetStatus(TUniqueId* failed_instance_id) { +Status Coordinator::BackendState::GetStatus(bool* is_fragment_failure, + TUniqueId* failed_instance_id) { lock_guard<mutex> l(lock_); + DCHECK_EQ(is_fragment_failure == nullptr, failed_instance_id == nullptr); if (!status_.ok() && failed_instance_id != nullptr) { + *is_fragment_failure = is_fragment_failure_; *failed_instance_id = failed_instance_id_; } return status_; @@ -278,6 +281,7 @@ bool Coordinator::BackendState::ApplyExecStatusReport( if (status_.ok() || status_.IsCancelled()) { status_ = instance_status; failed_instance_id_ = instance_exec_status.fragment_instance_id; + is_fragment_failure_ = true; } } DCHECK_GT(num_remaining_instances_, 0); @@ -302,6 +306,14 @@ bool Coordinator::BackendState::ApplyExecStatusReport( } } + // status_ has incorporated the status from all fragment instances. If the overall + // backend status is not OK, but no specific fragment instance reported an error, then + // this is a general backend error. Incorporate the general error into status_. + Status overall_backend_status(backend_exec_status.status); + if (!overall_backend_status.ok() && (status_.ok() || status_.IsCancelled())) { + status_ = overall_backend_status; + } + // Log messages aggregated by type if (backend_exec_status.__isset.error_log && backend_exec_status.error_log.size() > 0) { // Append the log messages from each update with the global state of the query http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/91f7bc19/be/src/runtime/coordinator-backend-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h index ccc3618..4ea2f33 100644 --- a/be/src/runtime/coordinator-backend-state.h +++ b/be/src/runtime/coordinator-backend-state.h @@ -90,9 +90,19 @@ class Coordinator::BackendState { /// if cancellation was attempted, false otherwise. bool Cancel(); - /// Return the overall execution status. For an error status, also return the id - /// of the instance that caused that status, if failed_instance_id != nullptr. - Status GetStatus(TUniqueId* failed_instance_id = nullptr) WARN_UNUSED_RESULT; + /// Return the overall execution status. For an error status, the error could come + /// from the fragment instance level or it can be a general error from the backend + /// (with no specific fragment responsible). For a caller to distinguish between + /// these errors and to determine the specific fragment instance (if applicable), + /// both 'is_fragment_failure' and 'failed_instance_id' must be non-null. + /// A general error will set *is_fragment_failure to false and leave + /// failed_instance_id untouched. + /// A fragment-specific error will set *is_fragment_failure to true and set + /// *failed_instance_id to the id of the fragment instance that failed. + /// If the caller does not need this information, both 'is_fragment_failure' and + /// 'failed_instance_id' must be omitted (using the default value of nullptr). + Status GetStatus(bool* is_fragment_failure = nullptr, + TUniqueId* failed_instance_id = nullptr) WARN_UNUSED_RESULT; /// Return peak memory consumption. int64_t GetPeakConsumption(); @@ -199,6 +209,11 @@ class Coordinator::BackendState { /// initiated; either way, execution must not be cancelled. Status status_; + /// Used to distinguish between errors reported by a specific fragment instance, + /// which would set failed_instance_id_, rather than an error independent of any + /// specific fragment. + bool is_fragment_failure_ = false; + /// Id of the first fragment instance that reports an error status. /// Invalid if no fragment instance has reported an error status. TUniqueId failed_instance_id_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/91f7bc19/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 029e0bc..c8df1f5 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -471,8 +471,8 @@ Status Coordinator::GetStatus() { return query_status_; } -Status Coordinator::UpdateStatus(const Status& status, const TUniqueId& instance_id, - const string& instance_hostname) { + Status Coordinator::UpdateStatus(const Status& status, const string& backend_hostname, + bool is_fragment_failure, const TUniqueId& instance_id) { { lock_guard<mutex> l(lock_); @@ -490,10 +490,14 @@ Status Coordinator::UpdateStatus(const Status& status, const TUniqueId& instance CancelInternal(); } - // Log the id of the fragment that first failed so we can track it down more easily. - VLOG_QUERY << "Query id=" << query_id() << " failed because instance id=" - << instance_id << " on host=" << instance_hostname << " failed."; - + if (is_fragment_failure) { + // Log the id of the fragment that first failed so we can track it down more easily. + VLOG_QUERY << "Query id=" << query_id() << " failed because instance id=" + << instance_id << " on host=" << backend_hostname << " failed."; + } else { + VLOG_QUERY << "Query id=" << query_id() << " failed due to error on host=" + << backend_hostname; + } return query_status_; } @@ -822,8 +826,8 @@ Status Coordinator::Wait() { if (stmt_type_ == TStmtType::QUERY) { DCHECK(coord_instance_ != nullptr); - return UpdateStatus(coord_instance_->WaitForOpen(), - runtime_state()->fragment_instance_id(), FLAGS_hostname); + return UpdateStatus(coord_instance_->WaitForOpen(), FLAGS_hostname, true, + runtime_state()->fragment_instance_id()); } DCHECK_EQ(stmt_type_, TStmtType::DML); @@ -867,8 +871,8 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) { // if there was an error, we need to return the query's error status rather than // the status we just got back from the local executor (which may well be CANCELLED // in that case). Coordinator fragment failed in this case so we log the query_id. - RETURN_IF_ERROR( - UpdateStatus(status, runtime_state()->fragment_instance_id(), FLAGS_hostname)); + RETURN_IF_ERROR(UpdateStatus(status, FLAGS_hostname, true, + runtime_state()->fragment_instance_id())); if (*eos) { returned_all_results_ = true; @@ -950,11 +954,13 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param // true (UpdateStatus() initiates cancellation, if it hasn't already been) // TODO: clarify control flow here, it's unclear we should even process this status // report if returned_all_results_ is true + bool is_fragment_failure; TUniqueId failed_instance_id; - Status status = backend_state->GetStatus(&failed_instance_id); + Status status = backend_state->GetStatus(&is_fragment_failure, &failed_instance_id); if (!status.ok() && !returned_all_results_) { - Status ignored = UpdateStatus(status, failed_instance_id, - TNetworkAddressToString(backend_state->impalad_address())); + Status ignored = + UpdateStatus(status, TNetworkAddressToString(backend_state->impalad_address()), + is_fragment_failure, failed_instance_id); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/91f7bc19/be/src/runtime/coordinator.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h index 4edef88..e67ef13 100644 --- a/be/src/runtime/coordinator.h +++ b/be/src/runtime/coordinator.h @@ -363,12 +363,17 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save void CancelInternal(); /// Acquires lock_ and updates query_status_ with 'status' if it's not already - /// an error status, and returns the current query_status_. + /// an error status, and returns the current query_status_. The status may be + /// due to an error in a specific fragment instance, or it can be a general error + /// not tied to a specific fragment instance. /// Calls CancelInternal() when switching to an error status. - /// failed_fragment is the fragment_id that has failed, used for error reporting along - /// with instance_hostname. - Status UpdateStatus(const Status& status, const TUniqueId& failed_fragment, - const std::string& instance_hostname) WARN_UNUSED_RESULT; + /// When an error is due to a specific fragment instance, 'is_fragment_failure' must + /// be true and 'failed_fragment' is the fragment_id that has failed, used for error + /// reporting. For a general error not tied to a specific instance, + /// 'is_fragment_failure' must be false and 'failed_fragment' will be ignored. + /// 'backend_hostname' is used for error reporting in either case. + Status UpdateStatus(const Status& status, const std::string& backend_hostname, + bool is_fragment_failure, const TUniqueId& failed_fragment) WARN_UNUSED_RESULT; /// Update per_partition_status_ and files_to_move_. void UpdateInsertExecStatus(const TInsertExecStatus& insert_exec_status); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/91f7bc19/be/src/runtime/query-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index 5ac4998..4311e27 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -207,6 +207,7 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status, params.__set_query_id(query_ctx().query_id); DCHECK(rpc_params().__isset.coord_state_idx); params.__set_coord_state_idx(rpc_params().coord_state_idx); + status.SetTStatus(¶ms); if (fis != nullptr) { // create status for 'fis' http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/91f7bc19/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 39df289..db6ffbb 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -624,6 +624,18 @@ struct TReportExecStatusParams { // New errors that have not been reported to the coordinator by any of the // instances included in instance_exec_status 6: optional map<ErrorCodes.TErrorCode, TErrorLogEntry> error_log; + + // Cumulative status for this backend. A backend can have an error from a specific + // fragment instance, or it can have a general error that is independent of any + // individual fragment. If reporting a single error, this status is always set to + // the error being reported. If reporting multiple errors, the status is set by the + // following rules: + // 1. A general error takes precedence over any fragment instance error. + // 2. Any fragment instance error takes precedence over any cancelled status. + // 3. If multiple fragments have errors, prefer the error that comes first in the + // 'instance_exec_status' list. + // This status is only OK if all fragment instances included are OK. + 7: optional Status.TStatus status; } struct TReportExecStatusResult {
