IMPALA-7207: make Coordinator::exec_state_ an atomic enum

That allows us to avoid taking the lock in cases where only
the exec_state_ field needs to be read (as opposed to needing
to read both exec_state_ and exec_status_). In particular,
it avoids the lock on the non-terminating paths, which is
the common case.

Change-Id: Ie6c5d5c6ccfdfd533cd0607aab6f554e664b90ac
Reviewed-on: http://gerrit.cloudera.org:8080/10811
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b9e19d09
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b9e19d09
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b9e19d09

Branch: refs/heads/master
Commit: b9e19d093ce3339f0ce135de5aa36ae98747ea6c
Parents: 4dad7ed
Author: Dan Hecht <[email protected]>
Authored: Mon Jun 25 10:47:54 2018 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Mon Jun 25 22:25:22 2018 +0000

----------------------------------------------------------------------
 be/src/common/atomic.h                   |  3 ++-
 be/src/runtime/coordinator.cc            | 21 +++++++--------------
 be/src/runtime/coordinator.h             | 12 ++++++++----
 be/src/runtime/fragment-instance-state.h |  3 ++-
 4 files changed, 19 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b9e19d09/be/src/common/atomic.h
----------------------------------------------------------------------
diff --git a/be/src/common/atomic.h b/be/src/common/atomic.h
index 4c72826..5aa5f86 100644
--- a/be/src/common/atomic.h
+++ b/be/src/common/atomic.h
@@ -153,11 +153,12 @@ class AtomicEnum {
       "Underlying enum type must fit into 4 bytes");
 
  public:
+  AtomicEnum(T initial) : enum_(static_cast<int32_t>(initial)) {}
   /// Atomic load with "acquire" memory-ordering semantic.
   ALWAYS_INLINE T Load() const { return static_cast<T>(enum_.Load()); }
 
   /// Atomic store with "release" memory-ordering semantic.
-  ALWAYS_INLINE void Store(T val) { enum_.Store(val); }
+  ALWAYS_INLINE void Store(T val) { enum_.Store(static_cast<int32_t>(val)); }
 
  private:
   internal::AtomicInt<int32_t> enum_;

http://git-wip-us.apache.org/repos/asf/impala/blob/b9e19d09/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 2f14825..5470be6 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -76,7 +76,7 @@ Coordinator::Coordinator(
 
 Coordinator::~Coordinator() {
   // Must have entered a terminal exec state guaranteeing resources were 
released.
-  DCHECK_NE(exec_state_, ExecState::EXECUTING);
+  DCHECK_NE(exec_state_.Load(), ExecState::EXECUTING);
   DCHECK_LE(backend_exec_complete_barrier_->pending(), 0);
   // Release the coordinator's reference to the query control structures.
   if (query_state_ != nullptr) {
@@ -450,9 +450,9 @@ Status Coordinator::SetNonErrorTerminalState(const 
ExecState state) {
   {
     lock_guard<SpinLock> l(exec_state_lock_);
     // May have already entered a terminal state, in which case nothing to do.
-    if (exec_state_ != ExecState::EXECUTING) return exec_status_;
+    if (exec_state_.Load() != ExecState::EXECUTING) return exec_status_;
     DCHECK(exec_status_.ok()) << exec_status_;
-    exec_state_ = state;
+    exec_state_.Store(state);
     if (state == ExecState::CANCELLED) exec_status_ = Status::CANCELLED;
     ret_status = exec_status_;
   }
@@ -468,13 +468,13 @@ Status Coordinator::UpdateExecState(const Status& status,
   ExecState old_state, new_state;
   {
     lock_guard<SpinLock> l(exec_state_lock_);
-    old_state = exec_state_;
+    old_state = exec_state_.Load();
     if (old_state == ExecState::EXECUTING) {
       DCHECK(exec_status_.ok()) << exec_status_;
       if (!status.ok()) {
         // Error while executing - go to ERROR state.
         exec_status_ = status;
-        exec_state_ = ExecState::ERROR;
+        exec_state_.Store(ExecState::ERROR);
       }
     } else if (old_state == ExecState::RETURNED_RESULTS) {
       // Already returned all results. Leave exec status as ok, stay in this 
state.
@@ -492,7 +492,7 @@ Status Coordinator::UpdateExecState(const Status& status,
         exec_status_ = status;
       }
     }
-    new_state = exec_state_;
+    new_state = exec_state_.Load();
     ret_status = exec_status_;
   }
   // Log interesting status: a non-cancelled error or a cancellation if was 
executing.
@@ -508,11 +508,6 @@ Status Coordinator::UpdateExecState(const Status& status,
   return ret_status;
 }
 
-bool Coordinator::ReturnedAllResults() {
-  lock_guard<SpinLock> l(exec_state_lock_);
-  return exec_state_ == ExecState::RETURNED_RESULTS;
-}
-
 void Coordinator::HandleExecStateTransition(
     const ExecState old_state, const ExecState new_state) {
   static const unordered_map<ExecState, const char *> exec_state_to_event{
@@ -623,9 +618,7 @@ Status Coordinator::GetNext(QueryResultSet* results, int 
max_rows, bool* eos) {
   DCHECK(has_called_wait_);
   SCOPED_TIMER(query_profile_->total_time_counter());
 
-  // exec_state_lock_ not needed here though since this path won't execute 
concurrently
-  // with itself or DML finalization.
-  if (exec_state_ == ExecState::RETURNED_RESULTS) {
+  if (ReturnedAllResults()) {
     // Nothing left to do: already in a terminal state and no more results.
     *eos = true;
     return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/b9e19d09/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 5bb399f..a0dce35 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -29,7 +29,6 @@
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
 #include "runtime/dml-exec-state.h"
-#include "util/condition-variable.h"
 #include "util/progress-updater.h"
 #include "util/runtime-profile-counters.h"
 #include "util/spinlock.h"
@@ -276,7 +275,10 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// cancellation. Initialized in StartBackendExec().
   boost::scoped_ptr<CountingBarrier> backend_exec_complete_barrier_;
 
-  SpinLock exec_state_lock_; // protects exec-state_ and exec_status_
+  // Protects exec_state_ and exec_status_. exec_state_ can be read 
independently via
+  // the atomic, but the lock is held when writing either field and when 
reading both
+  // fields together.
+  SpinLock exec_state_lock_;
 
   /// EXECUTING: in-flight; the only non-terminal state
   /// RETURNED_RESULTS: GetNext() set eos to true, or for DML, the request is 
complete
@@ -285,7 +287,7 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   enum class ExecState {
     EXECUTING, RETURNED_RESULTS, CANCELLED, ERROR
   };
-  ExecState exec_state_ = ExecState::EXECUTING;
+  AtomicEnum<ExecState> exec_state_{ExecState::EXECUTING};
 
   /// Overall execution status; only set on exec_state_ transitions:
   /// - EXECUTING: OK
@@ -357,7 +359,9 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
 
   /// Return true if 'exec_state_' is RETURNED_RESULTS.
   /// TODO: remove with IMPALA-6984.
-  bool ReturnedAllResults() WARN_UNUSED_RESULT;
+  bool ReturnedAllResults() WARN_UNUSED_RESULT {
+    return exec_state_.Load() == ExecState::RETURNED_RESULTS;
+  }
 
   /// Return the string representation of 'state'.
   static const char* ExecStateToString(const ExecState state);

http://git-wip-us.apache.org/repos/asf/impala/blob/b9e19d09/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h 
b/be/src/runtime/fragment-instance-state.h
index 8295c8f..df27f9c 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -204,7 +204,8 @@ class FragmentInstanceState {
 
   /// The current state of this fragment instance's execution. Only updated by 
the
   /// fragment instance thread in UpdateState() and read by the profile 
reporting threads.
-  AtomicEnum<TFInstanceExecState::type> current_state_;
+  AtomicEnum<TFInstanceExecState::type> current_state_{
+    TFInstanceExecState::WAITING_FOR_EXEC};
 
   /// Output sink for rows sent to this fragment. Created in Prepare(), lives 
in
   /// obj_pool().

Reply via email to