IMPALA-7207: make Coordinator::exec_state_ an atomic enum That allows us to avoid taking the lock in cases where only the exec_state_ field needs to be read (as opposed to needing to read both exec_state_ and exec_status_). In particular, it avoids the lock on the non-terminating paths, which is the common case.
Change-Id: Ie6c5d5c6ccfdfd533cd0607aab6f554e664b90ac Reviewed-on: http://gerrit.cloudera.org:8080/10811 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b9e19d09 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b9e19d09 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b9e19d09 Branch: refs/heads/master Commit: b9e19d093ce3339f0ce135de5aa36ae98747ea6c Parents: 4dad7ed Author: Dan Hecht <[email protected]> Authored: Mon Jun 25 10:47:54 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Mon Jun 25 22:25:22 2018 +0000 ---------------------------------------------------------------------- be/src/common/atomic.h | 3 ++- be/src/runtime/coordinator.cc | 21 +++++++-------------- be/src/runtime/coordinator.h | 12 ++++++++---- be/src/runtime/fragment-instance-state.h | 3 ++- 4 files changed, 19 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/b9e19d09/be/src/common/atomic.h ---------------------------------------------------------------------- diff --git a/be/src/common/atomic.h b/be/src/common/atomic.h index 4c72826..5aa5f86 100644 --- a/be/src/common/atomic.h +++ b/be/src/common/atomic.h @@ -153,11 +153,12 @@ class AtomicEnum { "Underlying enum type must fit into 4 bytes"); public: + AtomicEnum(T initial) : enum_(static_cast<int32_t>(initial)) {} /// Atomic load with "acquire" memory-ordering semantic. ALWAYS_INLINE T Load() const { return static_cast<T>(enum_.Load()); } /// Atomic store with "release" memory-ordering semantic. - ALWAYS_INLINE void Store(T val) { enum_.Store(val); } + ALWAYS_INLINE void Store(T val) { enum_.Store(static_cast<int32_t>(val)); } private: internal::AtomicInt<int32_t> enum_; http://git-wip-us.apache.org/repos/asf/impala/blob/b9e19d09/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 2f14825..5470be6 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -76,7 +76,7 @@ Coordinator::Coordinator( Coordinator::~Coordinator() { // Must have entered a terminal exec state guaranteeing resources were released. - DCHECK_NE(exec_state_, ExecState::EXECUTING); + DCHECK_NE(exec_state_.Load(), ExecState::EXECUTING); DCHECK_LE(backend_exec_complete_barrier_->pending(), 0); // Release the coordinator's reference to the query control structures. if (query_state_ != nullptr) { @@ -450,9 +450,9 @@ Status Coordinator::SetNonErrorTerminalState(const ExecState state) { { lock_guard<SpinLock> l(exec_state_lock_); // May have already entered a terminal state, in which case nothing to do. - if (exec_state_ != ExecState::EXECUTING) return exec_status_; + if (exec_state_.Load() != ExecState::EXECUTING) return exec_status_; DCHECK(exec_status_.ok()) << exec_status_; - exec_state_ = state; + exec_state_.Store(state); if (state == ExecState::CANCELLED) exec_status_ = Status::CANCELLED; ret_status = exec_status_; } @@ -468,13 +468,13 @@ Status Coordinator::UpdateExecState(const Status& status, ExecState old_state, new_state; { lock_guard<SpinLock> l(exec_state_lock_); - old_state = exec_state_; + old_state = exec_state_.Load(); if (old_state == ExecState::EXECUTING) { DCHECK(exec_status_.ok()) << exec_status_; if (!status.ok()) { // Error while executing - go to ERROR state. exec_status_ = status; - exec_state_ = ExecState::ERROR; + exec_state_.Store(ExecState::ERROR); } } else if (old_state == ExecState::RETURNED_RESULTS) { // Already returned all results. Leave exec status as ok, stay in this state. @@ -492,7 +492,7 @@ Status Coordinator::UpdateExecState(const Status& status, exec_status_ = status; } } - new_state = exec_state_; + new_state = exec_state_.Load(); ret_status = exec_status_; } // Log interesting status: a non-cancelled error or a cancellation if was executing. @@ -508,11 +508,6 @@ Status Coordinator::UpdateExecState(const Status& status, return ret_status; } -bool Coordinator::ReturnedAllResults() { - lock_guard<SpinLock> l(exec_state_lock_); - return exec_state_ == ExecState::RETURNED_RESULTS; -} - void Coordinator::HandleExecStateTransition( const ExecState old_state, const ExecState new_state) { static const unordered_map<ExecState, const char *> exec_state_to_event{ @@ -623,9 +618,7 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) { DCHECK(has_called_wait_); SCOPED_TIMER(query_profile_->total_time_counter()); - // exec_state_lock_ not needed here though since this path won't execute concurrently - // with itself or DML finalization. - if (exec_state_ == ExecState::RETURNED_RESULTS) { + if (ReturnedAllResults()) { // Nothing left to do: already in a terminal state and no more results. *eos = true; return Status::OK(); http://git-wip-us.apache.org/repos/asf/impala/blob/b9e19d09/be/src/runtime/coordinator.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h index 5bb399f..a0dce35 100644 --- a/be/src/runtime/coordinator.h +++ b/be/src/runtime/coordinator.h @@ -29,7 +29,6 @@ #include "gen-cpp/Frontend_types.h" #include "gen-cpp/Types_types.h" #include "runtime/dml-exec-state.h" -#include "util/condition-variable.h" #include "util/progress-updater.h" #include "util/runtime-profile-counters.h" #include "util/spinlock.h" @@ -276,7 +275,10 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// cancellation. Initialized in StartBackendExec(). boost::scoped_ptr<CountingBarrier> backend_exec_complete_barrier_; - SpinLock exec_state_lock_; // protects exec-state_ and exec_status_ + // Protects exec_state_ and exec_status_. exec_state_ can be read independently via + // the atomic, but the lock is held when writing either field and when reading both + // fields together. + SpinLock exec_state_lock_; /// EXECUTING: in-flight; the only non-terminal state /// RETURNED_RESULTS: GetNext() set eos to true, or for DML, the request is complete @@ -285,7 +287,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save enum class ExecState { EXECUTING, RETURNED_RESULTS, CANCELLED, ERROR }; - ExecState exec_state_ = ExecState::EXECUTING; + AtomicEnum<ExecState> exec_state_{ExecState::EXECUTING}; /// Overall execution status; only set on exec_state_ transitions: /// - EXECUTING: OK @@ -357,7 +359,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// Return true if 'exec_state_' is RETURNED_RESULTS. /// TODO: remove with IMPALA-6984. - bool ReturnedAllResults() WARN_UNUSED_RESULT; + bool ReturnedAllResults() WARN_UNUSED_RESULT { + return exec_state_.Load() == ExecState::RETURNED_RESULTS; + } /// Return the string representation of 'state'. static const char* ExecStateToString(const ExecState state); http://git-wip-us.apache.org/repos/asf/impala/blob/b9e19d09/be/src/runtime/fragment-instance-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h index 8295c8f..df27f9c 100644 --- a/be/src/runtime/fragment-instance-state.h +++ b/be/src/runtime/fragment-instance-state.h @@ -204,7 +204,8 @@ class FragmentInstanceState { /// The current state of this fragment instance's execution. Only updated by the /// fragment instance thread in UpdateState() and read by the profile reporting threads. - AtomicEnum<TFInstanceExecState::type> current_state_; + AtomicEnum<TFInstanceExecState::type> current_state_{ + TFInstanceExecState::WAITING_FOR_EXEC}; /// Output sink for rows sent to this fragment. Created in Prepare(), lives in /// obj_pool().
