Repository: impala
Updated Branches:
  refs/heads/2.x 22244bb07 -> da329442a


IMPALA-5384, part 2: Simplify Coordinator locking and clarify state

The is the final change to clarify and break up the Coordinator's lock.
The state machine for the coordinator is made explicit, distinguishing
between executing state and multiple terminal states. Logic to
transition into a terminal state is centralized in one location and
executes exactly once for each coordinator object.

Derived from a patch for IMPALA_5384 by Marcel Kornacker.

Testing:
- exhaustive functional tests
- stress test on minicluster with memory overcommitment. Verified from
  the logs that this exercises all these paths:
  - successful queries
  - client requested cancellation
  - error from exec FInstances RPC
  - error reported asynchronously via report status RPC
  - eos before backend execution completed

Change-Id: I1abdfd02163f9356c59d470fe1c64ebe012a9e8e
Reviewed-on: http://gerrit.cloudera.org:8080/10158
Reviewed-by: Dan Hecht <dhe...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/10389


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/da329442
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/da329442
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/da329442

Branch: refs/heads/2.x
Commit: da329442a598c29c97b9f43964c1b2af263c8391
Parents: 22244bb
Author: Dan Hecht <dhe...@cloudera.com>
Authored: Fri Apr 13 16:51:25 2018 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Committed: Mon May 14 22:00:38 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator-backend-state.h |   8 +
 be/src/runtime/coordinator.cc              | 424 +++++++++++-------------
 be/src/runtime/coordinator.h               | 333 ++++++++++---------
 be/src/service/client-request-state.cc     |   2 +-
 be/src/service/impala-server.h             |   5 -
 be/src/util/counting-barrier.h             |  21 +-
 6 files changed, 397 insertions(+), 396 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/da329442/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h 
b/be/src/runtime/coordinator-backend-state.h
index d2f122c..e7af2e2 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -21,9 +21,17 @@
 #include <vector>
 #include <unordered_set>
 
+#include <boost/accumulators/accumulators.hpp>
+#include <boost/accumulators/statistics/max.hpp>
+#include <boost/accumulators/statistics/mean.hpp>
+#include <boost/accumulators/statistics/median.hpp>
+#include <boost/accumulators/statistics/min.hpp>
+#include <boost/accumulators/statistics/stats.hpp>
+#include <boost/accumulators/statistics/variance.hpp>
 #include <boost/thread/mutex.hpp>
 
 #include "runtime/coordinator.h"
+#include "scheduling/query-schedule.h"
 #include "util/progress-updater.h"
 #include "util/stopwatch.h"
 #include "util/runtime-profile.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/da329442/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index d6a70e7..db07a3f 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -25,6 +25,7 @@
 #include <boost/algorithm/string.hpp>
 #include <gutil/strings/substitute.h>
 
+#include "common/hdfs.h"
 #include "exec/data-sink.h"
 #include "exec/plan-root-sink.h"
 #include "gen-cpp/ImpalaInternalService.h"
@@ -39,6 +40,7 @@
 #include "runtime/query-state.h"
 #include "scheduling/admission-controller.h"
 #include "scheduling/scheduler.h"
+#include "scheduling/query-schedule.h"
 #include "util/bloom-filter.h"
 #include "util/counting-barrier.h"
 #include "util/hdfs-bulk-ops.h"
@@ -51,16 +53,13 @@
 
 using namespace apache::thrift;
 using namespace rapidjson;
-using namespace strings;
 using boost::algorithm::iequals;
 using boost::algorithm::is_any_of;
 using boost::algorithm::join;
 using boost::algorithm::token_compress_on;
 using boost::algorithm::split;
 using boost::filesystem::path;
-using std::unique_ptr;
 
-DECLARE_int32(be_port);
 DECLARE_string(hostname);
 
 using namespace impala;
@@ -76,11 +75,9 @@ Coordinator::Coordinator(
     query_events_(events) {}
 
 Coordinator::~Coordinator() {
-  DCHECK(released_exec_resources_)
-      << "ReleaseExecResources() must be called before Coordinator is 
destroyed";
-  DCHECK(released_admission_control_resources_)
-      << "ReleaseAdmissionControlResources() must be called before Coordinator 
is "
-      << "destroyed";
+  // Must have entered a terminal exec state guaranteeing resources were 
released.
+  DCHECK_NE(exec_state_, ExecState::EXECUTING);
+  // Release the coordinator's reference to the query control structures.
   if (query_state_ != nullptr) {
     ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
   }
@@ -109,12 +106,6 @@ Status Coordinator::Exec() {
   bool is_mt_execution = request.query_ctx.client_request.query_options.mt_dop 
> 0;
   if (is_mt_execution) filter_mode_ = TRuntimeFilterMode::OFF;
 
-  // to keep things simple, make async Cancel() calls wait until plan fragment
-  // execution has been initiated, otherwise we might try to cancel fragment
-  // execution at Impala daemons where it hasn't even started
-  // TODO: revisit this, it may not be true anymore
-  lock_guard<mutex> l(lock_);
-
   query_state_ = 
ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx());
   query_state_->AcquireExecResourceRefcount(); // Decremented in 
ReleaseExecResources().
   filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
@@ -138,9 +129,9 @@ Status Coordinator::Exec() {
     InitFilterRoutingTable();
   }
 
-  // At this point, all static setup is done and all structures are 
initialized.
-  // Only runtime-related state changes past this point (examples:
-  // num_remaining_backends_, fragment instance profiles, etc.)
+  // At this point, all static setup is done and all structures are 
initialized. Only
+  // runtime-related state changes past this point (examples: fragment instance
+  // profiles, etc.)
 
   StartBackendExec();
   RETURN_IF_ERROR(FinishBackendStartup());
@@ -155,7 +146,7 @@ Status Coordinator::Exec() {
       // which means we failed Prepare
       Status prepare_status = query_state_->WaitForPrepare();
       DCHECK(!prepare_status.ok());
-      return prepare_status;
+      return UpdateExecState(prepare_status, nullptr, FLAGS_hostname);
     }
 
     // When GetFInstanceState() returns the coordinator instance, the Prepare 
phase
@@ -169,7 +160,6 @@ Status Coordinator::Exec() {
     coord_sink_ = coord_instance_->root_sink();
     DCHECK(coord_sink_ != nullptr);
   }
-
   return Status::OK();
 }
 
@@ -208,6 +198,8 @@ void Coordinator::InitFragmentStats() {
 void Coordinator::InitBackendStates() {
   int num_backends = schedule_.per_backend_exec_params().size();
   DCHECK_GT(num_backends, 0);
+
+  lock_guard<SpinLock> l(backend_states_init_lock_);
   backend_states_.resize(num_backends);
 
   RuntimeProfile::Counter* num_backends_counter =
@@ -215,19 +207,13 @@ void Coordinator::InitBackendStates() {
   num_backends_counter->Set(num_backends);
 
   // create BackendStates
-  bool has_coord_fragment = schedule_.GetCoordFragment() != nullptr;
-  const TNetworkAddress& coord_address = 
ExecEnv::GetInstance()->backend_address();
   int backend_idx = 0;
   for (const auto& entry: schedule_.per_backend_exec_params()) {
-    if (has_coord_fragment && coord_address == entry.first) {
-      coord_backend_idx_ = backend_idx;
-    }
     BackendState* backend_state = obj_pool()->Add(
         new BackendState(query_id(), backend_idx, filter_mode_));
     backend_state->Init(entry.second, fragment_stats_, obj_pool());
     backend_states_[backend_idx++] = backend_state;
   }
-  DCHECK(!has_coord_fragment || coord_backend_idx_ != -1);
 }
 
 void Coordinator::ExecSummary::Init(const QuerySchedule& schedule) {
@@ -341,8 +327,8 @@ void Coordinator::InitFilterRoutingTable() {
 
 void Coordinator::StartBackendExec() {
   int num_backends = backend_states_.size();
-  exec_complete_barrier_.reset(new CountingBarrier(num_backends));
-  num_remaining_backends_ = num_backends;
+  exec_rpcs_complete_barrier_.reset(new CountingBarrier(num_backends));
+  backend_exec_complete_barrier_.reset(new CountingBarrier(num_backends));
 
   DebugOptions debug_options(schedule_.query_options());
 
@@ -354,11 +340,11 @@ void Coordinator::StartBackendExec() {
     ExecEnv::GetInstance()->exec_rpc_thread_pool()->Offer(
         [backend_state, this, &debug_options]() {
           backend_state->Exec(query_ctx(), debug_options, 
filter_routing_table_,
-            exec_complete_barrier_.get());
+              exec_rpcs_complete_barrier_.get());
         });
   }
+  exec_rpcs_complete_barrier_->Wait();
 
-  exec_complete_barrier_->Wait();
   VLOG_QUERY << "started execution on " << num_backends << " backends for 
query_id="
              << PrintId(query_id());
   query_events_->MarkEvent(
@@ -367,26 +353,24 @@ void Coordinator::StartBackendExec() {
 }
 
 Status Coordinator::FinishBackendStartup() {
-  Status status = Status::OK();
   const TMetricDef& def =
       MakeTMetricDef("backend-startup-latencies", TMetricKind::HISTOGRAM, 
TUnit::TIME_MS);
   // Capture up to 30 minutes of start-up times, in ms, with 4 s.f. accuracy.
   HistogramMetric latencies(def, 30 * 60 * 1000, 4);
+  Status status = Status::OK();
+  string error_hostname;
   for (BackendState* backend_state: backend_states_) {
     // preserve the first non-OK, if there is one
     Status backend_status = backend_state->GetStatus();
-    if (!backend_status.ok() && status.ok()) status = backend_status;
+    if (!backend_status.ok() && status.ok()) {
+      status = backend_status;
+      error_hostname = backend_state->impalad_address().hostname;
+    }
     latencies.Update(backend_state->rpc_latency());
   }
-
   query_profile_->AddInfoString(
       "Backend startup latencies", latencies.ToHumanReadable());
-
-  if (!status.ok()) {
-    query_status_ = status;
-    CancelInternal();
-  }
-  return status;
+  return UpdateExecState(status, nullptr, error_hostname);
 }
 
 string Coordinator::FilterDebugString() {
@@ -446,40 +430,115 @@ string Coordinator::FilterDebugString() {
   return Substitute("\n$0", table_printer.ToString());
 }
 
-Status Coordinator::GetStatus() {
-  lock_guard<mutex> l(lock_);
-  return query_status_;
+const char* Coordinator::ExecStateToString(const ExecState state) {
+  static const unordered_map<ExecState, const char *> exec_state_to_str{
+    {ExecState::EXECUTING,        "EXECUTING"},
+    {ExecState::RETURNED_RESULTS, "RETURNED_RESULTS"},
+    {ExecState::CANCELLED,        "CANCELLED"},
+    {ExecState::ERROR,            "ERROR"}};
+  return exec_state_to_str.at(state);
 }
 
-Status Coordinator::UpdateStatus(const Status& status, const string& 
backend_hostname,
-    bool is_fragment_failure, const TUniqueId& instance_id) {
+Status Coordinator::SetNonErrorTerminalState(const ExecState state) {
+  DCHECK(state == ExecState::RETURNED_RESULTS || state == 
ExecState::CANCELLED);
+  Status ret_status;
   {
-    lock_guard<mutex> l(lock_);
-
-    // The query is done and we are just waiting for backends to clean up.
-    // Ignore their cancelled updates.
-    if (returned_all_results_ && status.IsCancelled()) return query_status_;
-
-    // nothing to update
-    if (status.ok()) return query_status_;
-
-    // don't override an error status; also, cancellation has already started
-    if (!query_status_.ok()) return query_status_;
-
-    query_status_ = status;
-    CancelInternal();
-  }
-
-  if (is_fragment_failure) {
-    // Log the id of the fragment that first failed so we can track it down 
more easily.
-    VLOG_QUERY << "query_id=" << PrintId(query_id())
-               << " failed because fragment_instance_id=" << 
PrintId(instance_id)
-               << " on host=" << backend_hostname << " failed.";
+    lock_guard<SpinLock> l(exec_state_lock_);
+    // May have already entered a terminal state, in which case nothing to do.
+    if (exec_state_ != ExecState::EXECUTING) return exec_status_;
+    DCHECK(exec_status_.ok()) << exec_status_;
+    exec_state_ = state;
+    if (state == ExecState::CANCELLED) exec_status_ = Status::CANCELLED;
+    ret_status = exec_status_;
+  }
+  VLOG_QUERY << Substitute("ExecState: query id=$0 execution $1", 
PrintId(query_id()),
+      state == ExecState::CANCELLED ? "cancelled" : "completed");
+  HandleExecStateTransition(ExecState::EXECUTING, state);
+  return ret_status;
+}
+
+Status Coordinator::UpdateExecState(const Status& status,
+    const TUniqueId* failed_finst, const string& instance_hostname) {
+  Status ret_status;
+  ExecState old_state, new_state;
+  {
+    lock_guard<SpinLock> l(exec_state_lock_);
+    old_state = exec_state_;
+    if (old_state == ExecState::EXECUTING) {
+      DCHECK(exec_status_.ok()) << exec_status_;
+      if (!status.ok()) {
+        // Error while executing - go to ERROR state.
+        exec_status_ = status;
+        exec_state_ = ExecState::ERROR;
+      }
+    } else if (old_state == ExecState::RETURNED_RESULTS) {
+      // Already returned all results. Leave exec status as ok, stay in this 
state.
+      DCHECK(exec_status_.ok()) << exec_status_;
+    } else if (old_state == ExecState::CANCELLED) {
+      // Client requested cancellation already, stay in this state.  Ignores 
errors
+      // after requested cancellations.
+      DCHECK(exec_status_.IsCancelled()) << exec_status_;
+    } else {
+      // Already in the ERROR state, stay in this state but update status to 
be the
+      // first non-cancelled status.
+      DCHECK_EQ(old_state, ExecState::ERROR);
+      DCHECK(!exec_status_.ok());
+      if (!status.ok() && !status.IsCancelled() && exec_status_.IsCancelled()) 
{
+        exec_status_ = status;
+      }
+    }
+    new_state = exec_state_;
+    ret_status = exec_status_;
+  }
+  // Log interesting status: a non-cancelled error or a cancellation if was 
executing.
+  if (!status.ok() && (!status.IsCancelled() || old_state == 
ExecState::EXECUTING)) {
+    VLOG_QUERY << Substitute(
+        "ExecState: query id=$0 finstance=$1 on host=$2 ($3 -> $4) status=$5",
+        PrintId(query_id()), failed_finst != nullptr ? PrintId(*failed_finst) 
: "N/A",
+        instance_hostname, ExecStateToString(old_state), 
ExecStateToString(new_state),
+        status.GetDetail());
+  }
+  // After dropping the lock, apply the state transition (if any) side-effects.
+  HandleExecStateTransition(old_state, new_state);
+  return ret_status;
+}
+
+bool Coordinator::ReturnedAllResults() {
+  lock_guard<SpinLock> l(exec_state_lock_);
+  return exec_state_ == ExecState::RETURNED_RESULTS;
+}
+
+void Coordinator::HandleExecStateTransition(
+    const ExecState old_state, const ExecState new_state) {
+  static const unordered_map<ExecState, const char *> exec_state_to_event{
+    {ExecState::EXECUTING,        "Executing"},
+    {ExecState::RETURNED_RESULTS, "Last row fetched"},
+    {ExecState::CANCELLED,        "Execution cancelled"},
+    {ExecState::ERROR,            "Execution error"}};
+  if (old_state == new_state) return;
+  // Once we enter a terminal state, we stay there, guaranteeing this code 
runs only once.
+  DCHECK_EQ(old_state, ExecState::EXECUTING);
+  // Should never transition to the initial state.
+  DCHECK_NE(new_state, ExecState::EXECUTING);
+
+  query_events_->MarkEvent(exec_state_to_event.at(new_state));
+  // TODO: IMPALA-7011 is this needed? This will also happen on the "backend" 
side of
+  // cancel rpc. And in the case of EOS, sink already knows this.
+  if (coord_sink_ != nullptr) coord_sink_->CloseConsumer();
+  // This thread won the race to transitioning into a terminal state - 
terminate
+  // execution and release resources.
+  ReleaseExecResources();
+  if (new_state == ExecState::RETURNED_RESULTS) {
+    // TODO: IMPALA-6984: cancel all backends in this case too.
+    WaitForBackends();
   } else {
-    VLOG_QUERY << "query_id=" << PrintId(query_id()) << " failed due to error 
on host="
-               << backend_hostname;
+    CancelBackends();
   }
-  return query_status_;
+  ReleaseAdmissionControlResources();
+  // Can compute summary only after we stop accepting reports from the 
backends. Both
+  // WaitForBackends() and CancelBackends() ensures that.
+  // TODO: should move this off of the query execution path?
+  ComputeQuerySummary();
 }
 
 Status Coordinator::FinalizeHdfsInsert() {
@@ -491,7 +550,7 @@ Status Coordinator::FinalizeHdfsInsert() {
 
   VLOG_QUERY << "Finalizing query: " << PrintId(query_id());
   SCOPED_TIMER(finalization_timer_);
-  Status return_status = GetStatus();
+  Status return_status = UpdateExecState(Status::OK(), nullptr, 
FLAGS_hostname);
   if (return_status.ok()) {
     HdfsTableDescriptor* hdfs_table;
     
RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(query_ctx().desc_tbl,
@@ -517,22 +576,13 @@ Status Coordinator::FinalizeHdfsInsert() {
   return return_status;
 }
 
-Status Coordinator::WaitForBackendCompletion() {
-  unique_lock<mutex> l(lock_);
-  while (num_remaining_backends_ > 0 && query_status_.ok()) {
-    VLOG_QUERY << "Coordinator waiting for backends to finish, "
-               << num_remaining_backends_ << " remaining. query_id="
-               << PrintId(query_id());
-    backend_completion_cv_.Wait(l);
+void Coordinator::WaitForBackends() {
+  int32_t num_remaining = backend_exec_complete_barrier_->pending();
+  if (num_remaining > 0) {
+    VLOG_QUERY << "Coordinator waiting for backends to finish, " << 
num_remaining
+               << " remaining. query_id=" << PrintId(query_id());
+    backend_exec_complete_barrier_->Wait();
   }
-  if (query_status_.ok()) {
-    VLOG_QUERY << "All backends finished successfully. query_id=" << 
PrintId(query_id());
-  } else {
-    VLOG_QUERY << "All backends finished due to one or more errors. query_id="
-               << PrintId(query_id()) << ". " << query_status_.GetDetail();
-  }
-
-  return query_status_;
 }
 
 Status Coordinator::Wait() {
@@ -543,34 +593,22 @@ Status Coordinator::Wait() {
 
   if (stmt_type_ == TStmtType::QUERY) {
     DCHECK(coord_instance_ != nullptr);
-    return UpdateStatus(coord_instance_->WaitForOpen(), FLAGS_hostname, true,
-        runtime_state()->fragment_instance_id());
+    return UpdateExecState(coord_instance_->WaitForOpen(),
+        &coord_instance_->runtime_state()->fragment_instance_id(), 
FLAGS_hostname);
   }
-
   DCHECK_EQ(stmt_type_, TStmtType::DML);
-  // Query finalization can only happen when all backends have reported 
relevant
-  // state. They only have relevant state to report in the parallel INSERT 
case,
-  // otherwise all the relevant state is from the coordinator fragment which 
will be
-  // available after Open() returns.  Ignore the returned status if 
finalization is
-  // required., since FinalizeHdfsInsert() will pick it up and needs to execute
-  // regardless.
-  Status status = WaitForBackendCompletion();
-  if (finalize_params() == nullptr && !status.ok()) return status;
-
-  // Execution of query fragments has finished. We don't need to hold onto 
query execution
-  // resources while we finalize the query.
-  ReleaseExecResources();
-  // Query finalization is required only for HDFS table sinks
-  if (finalize_params() != nullptr) RETURN_IF_ERROR(FinalizeHdfsInsert());
-  // Release admission control resources after we'd done the potentially 
heavyweight
-  // finalization.
-  ReleaseAdmissionControlResources();
-
+  // DML finalization can only happen when all backends have completed all 
side-effects
+  // and reported relevant state.
+  WaitForBackends();
+  if (finalize_params() != nullptr) {
+    RETURN_IF_ERROR(UpdateExecState(
+            FinalizeHdfsInsert(), nullptr, FLAGS_hostname));
+  }
+  // DML requests are finished at this point.
+  RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS));
   query_profile_->AddInfoString(
       "DML Stats", dml_exec_state_.OutputPartitionStats("\n"));
-  // For DML queries, when Wait is done, the query is complete.
-  ComputeQuerySummary();
-  return status;
+  return Status::OK();
 }
 
 Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
@@ -578,88 +616,54 @@ Status Coordinator::GetNext(QueryResultSet* results, int 
max_rows, bool* eos) {
   DCHECK(has_called_wait_);
   SCOPED_TIMER(query_profile_->total_time_counter());
 
-  if (returned_all_results_) {
-    // May be called after the first time we set *eos. Re-set *eos and return 
here;
-    // already torn-down coord_sink_ so no more work to do.
+  // exec_state_lock_ not needed here though since this path won't execute 
concurrently
+  // with itself or DML finalization.
+  if (exec_state_ == ExecState::RETURNED_RESULTS) {
+    // Nothing left to do: already in a terminal state and no more results.
     *eos = true;
     return Status::OK();
   }
+  DCHECK(coord_instance_ != nullptr) << "Exec() should be called first";
+  DCHECK(coord_sink_ != nullptr)     << "Exec() should be called first";
+  RuntimeState* runtime_state = coord_instance_->runtime_state();
 
-  DCHECK(coord_sink_ != nullptr)
-      << "GetNext() called without result sink. Perhaps Prepare() failed and 
was not "
-      << "checked?";
-  Status status = coord_sink_->GetNext(runtime_state(), results, max_rows, 
eos);
-
-  // if there was an error, we need to return the query's error status rather 
than
-  // the status we just got back from the local executor (which may well be 
CANCELLED
-  // in that case).  Coordinator fragment failed in this case so we log the 
query_id.
-  RETURN_IF_ERROR(UpdateStatus(status, FLAGS_hostname, true,
-      runtime_state()->fragment_instance_id()));
-
-  if (*eos) {
-    returned_all_results_ = true;
-    query_events_->MarkEvent("Last row fetched");
-    // release query execution resources here, since we won't be fetching more 
result rows
-    ReleaseExecResources();
-    // wait for all backends to complete before computing the summary
-    // TODO: relocate this so GetNext() won't have to wait for backends to 
complete?
-    RETURN_IF_ERROR(WaitForBackendCompletion());
-    // Release admission control resources after backends are finished.
-    ReleaseAdmissionControlResources();
-    // if the query completed successfully, compute the summary
-    if (query_status_.ok()) ComputeQuerySummary();
-  }
-
+  Status status = coord_sink_->GetNext(runtime_state, results, max_rows, eos);
+  RETURN_IF_ERROR(UpdateExecState(
+          status, &runtime_state->fragment_instance_id(), FLAGS_hostname));
+  if (*eos) 
RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS));
   return Status::OK();
 }
 
-void Coordinator::Cancel(const Status* cause) {
-  lock_guard<mutex> l(lock_);
-  // if the query status indicates an error, cancellation has already been 
initiated;
-  // prevent others from cancelling a second time
-  if (!query_status_.ok()) return;
-
-  // TODO: This should default to OK(), not CANCELLED if there is no cause (or 
callers
-  // should explicitly pass Status::OK()). Fragment instances may be cancelled 
at the end
-  // of a successful query. Need to clean up relationship between 
query_status_ here and
-  // in QueryExecState. See IMPALA-4279.
-  query_status_ = (cause != nullptr && !cause->ok()) ? *cause : 
Status::CANCELLED;
-  CancelInternal();
+void Coordinator::Cancel() {
+  // Illegal to call Cancel() before Exec() returns, so there's no danger of 
the cancel
+  // RPC passing the exec RPC.
+  DCHECK(exec_rpcs_complete_barrier_ != nullptr &&
+      exec_rpcs_complete_barrier_->pending() <= 0) << "Exec() must be called 
first";
+  discard_result(SetNonErrorTerminalState(ExecState::CANCELLED));
 }
 
-void Coordinator::CancelInternal() {
-  VLOG_QUERY << "Cancel() query_id=" << PrintId(query_id());
-  // TODO: remove when restructuring cancellation, which should happen 
automatically
-  // as soon as the coordinator knows that the query is finished
-  DCHECK(!query_status_.ok());
-
+void Coordinator::CancelBackends() {
   int num_cancelled = 0;
   for (BackendState* backend_state: backend_states_) {
     DCHECK(backend_state != nullptr);
     if (backend_state->Cancel()) ++num_cancelled;
   }
+  backend_exec_complete_barrier_->NotifyRemaining();
+
   VLOG_QUERY << Substitute(
       "CancelBackends() query_id=$0, tried to cancel $1 backends",
       PrintId(query_id()), num_cancelled);
-  backend_completion_cv_.NotifyAll();
-
-  ReleaseExecResourcesLocked();
-  ReleaseAdmissionControlResourcesLocked();
-  // Report the summary with whatever progress the query made before being 
cancelled.
-  ComputeQuerySummary();
 }
 
 Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& 
params) {
-  VLOG_FILE << "UpdateBackendExecStatus()  backend_idx=" << 
params.coord_state_idx;
+  VLOG_FILE << "UpdateBackendExecStatus() query_id=" << PrintId(query_id())
+            << " backend_idx=" << params.coord_state_idx;
   if (params.coord_state_idx >= backend_states_.size()) {
     return Status(TErrorCode::INTERNAL_ERROR,
         Substitute("Unknown backend index $0 (max known: $1)",
             params.coord_state_idx, backend_states_.size() - 1));
   }
   BackendState* backend_state = backend_states_[params.coord_state_idx];
-  // TODO: return here if returned_all_results_?
-  // TODO: return CANCELLED in that case? Although that makes the cancellation 
propagation
-  // path more irregular.
 
   // TODO: only do this when the sink is done; probably missing a done field
   // in TReportExecStatus for that
@@ -668,46 +672,30 @@ Status Coordinator::UpdateBackendExecStatus(const 
TReportExecStatusParams& param
   }
 
   if (backend_state->ApplyExecStatusReport(params, &exec_summary_, 
&progress_)) {
-    // This report made this backend done, so update the status and
-    // num_remaining_backends_.
-
-    // for now, abort the query if we see any error except if 
returned_all_results_ is
-    // true (UpdateStatus() initiates cancellation, if it hasn't already been)
-    // TODO: clarify control flow here, it's unclear we should even process 
this status
-    // report if returned_all_results_ is true
+    // This backend execution has completed.
     bool is_fragment_failure;
     TUniqueId failed_instance_id;
     Status status = backend_state->GetStatus(&is_fragment_failure, 
&failed_instance_id);
-    if (!status.ok() && !returned_all_results_) {
-      Status ignored =
-          UpdateStatus(status, 
TNetworkAddressToString(backend_state->impalad_address()),
-              is_fragment_failure, failed_instance_id);
-      return Status::OK();
-    }
-
-    lock_guard<mutex> l(lock_);
-    DCHECK_GT(num_remaining_backends_, 0);
-    if (VLOG_QUERY_IS_ON && num_remaining_backends_ > 1) {
-      VLOG_QUERY << "Backend completed: "
-          << " host=" << 
TNetworkAddressToString(backend_state->impalad_address())
-          << " remaining=" << num_remaining_backends_ - 1
-          << " query_id=" << PrintId(query_id());
+    int pending_backends = backend_exec_complete_barrier_->Notify();
+    if (VLOG_QUERY_IS_ON && pending_backends >= 0) {
+      VLOG_QUERY << "Backend completed:"
+                 << " host=" << 
TNetworkAddressToString(backend_state->impalad_address())
+                 << " remaining=" << pending_backends
+                 << " query_id=" << PrintId(query_id());
       BackendState::LogFirstInProgress(backend_states_);
     }
-    if (--num_remaining_backends_ == 0 || !status.ok()) {
-      backend_completion_cv_.NotifyAll();
+    if (!status.ok()) {
+      // TODO: IMPALA-5119: call UpdateExecState() asynchronously rather than
+      // from within this RPC handler (since it can make RPCs).
+      discard_result(UpdateExecState(status,
+              is_fragment_failure ? &failed_instance_id : nullptr,
+              TNetworkAddressToString(backend_state->impalad_address())));
     }
-    return Status::OK();
   }
   // If all results have been returned, return a cancelled status to force the 
fragment
   // instance to stop executing.
-  if (returned_all_results_) return Status::CANCELLED;
-
-  return Status::OK();
-}
-
-RuntimeState* Coordinator::runtime_state() {
-  return coord_instance_ == nullptr ? nullptr : 
coord_instance_->runtime_state();
+  // TODO: Make returning CANCELLED unnecessary with IMPALA-6984.
+  return ReturnedAllResults() ? Status::CANCELLED : Status::OK();
 }
 
 // TODO: add histogram/percentile
@@ -740,20 +728,14 @@ void Coordinator::ComputeQuerySummary() {
 
 string Coordinator::GetErrorLog() {
   ErrorLogMap merged;
-  for (BackendState* state: backend_states_) {
-    state->MergeErrorLog(&merged);
+  {
+    lock_guard<SpinLock> l(backend_states_init_lock_);
+    for (BackendState* state: backend_states_) state->MergeErrorLog(&merged);
   }
   return PrintErrorMapToString(merged);
 }
 
 void Coordinator::ReleaseExecResources() {
-  lock_guard<mutex> l(lock_);
-  ReleaseExecResourcesLocked();
-}
-
-void Coordinator::ReleaseExecResourcesLocked() {
-  if (released_exec_resources_) return;
-  released_exec_resources_ = true;
   if (filter_routing_table_.size() > 0) {
     query_profile_->AddInfoString("Final filter table", FilterDebugString());
   }
@@ -767,8 +749,6 @@ void Coordinator::ReleaseExecResourcesLocked() {
   }
   // This may be NULL while executing UDFs.
   if (filter_mem_tracker_ != nullptr) filter_mem_tracker_->Close();
-  // Need to protect against failed Prepare(), where root_sink() would not be 
set.
-  if (coord_sink_ != nullptr) coord_sink_->CloseConsumer();
   // Now that we've released our own resources, can release query-wide 
resources.
   if (query_state_ != nullptr) query_state_->ReleaseExecResourceRefcount();
   // At this point some tracked memory may still be used in the coordinator 
for result
@@ -776,27 +756,20 @@ void Coordinator::ReleaseExecResourcesLocked() {
 }
 
 void Coordinator::ReleaseAdmissionControlResources() {
-  lock_guard<mutex> l(lock_);
-  ReleaseAdmissionControlResourcesLocked();
-}
-
-void Coordinator::ReleaseAdmissionControlResourcesLocked() {
-  if (released_admission_control_resources_) return;
-  LOG(INFO) << "Release admission control resources for query_id="
-            << PrintId(query_ctx().query_id);
+  LOG(INFO) << "Release admission control resources for query_id=" << 
PrintId(query_id());
   AdmissionController* admission_controller =
       ExecEnv::GetInstance()->admission_controller();
   if (admission_controller != nullptr) 
admission_controller->ReleaseQuery(schedule_);
-  released_admission_control_resources_ = true;
   query_events_->MarkEvent("Released admission control resources");
 }
 
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
       << "UpdateFilter() called although runtime filters are disabled";
-  DCHECK(exec_complete_barrier_.get() != nullptr)
+  DCHECK(exec_rpcs_complete_barrier_.get() != nullptr)
       << "Filters received before fragments started!";
-  exec_complete_barrier_->Wait();
+
+  exec_rpcs_complete_barrier_->Wait();
   DCHECK(filter_routing_table_complete_)
       << "Filter received before routing table complete";
 
@@ -867,6 +840,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& 
params) {
   rpc_params.__set_dst_query_id(query_id());
   rpc_params.__set_filter_id(params.filter_id);
 
+  // Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid.
   for (BackendState* bs: backend_states_) {
     for (int fragment_idx: target_fragment_idxs) {
       rpc_params.__set_dst_fragment_idx(fragment_idx);
@@ -940,23 +914,19 @@ void Coordinator::FilterState::Disable(MemTracker* 
tracker) {
   }
 }
 
-const TUniqueId& Coordinator::query_id() const {
-  return query_ctx().query_id;
-}
-
 void Coordinator::GetTExecSummary(TExecSummary* exec_summary) {
   lock_guard<SpinLock> l(exec_summary_.lock);
   *exec_summary = exec_summary_.thrift_exec_summary;
 }
 
 MemTracker* Coordinator::query_mem_tracker() const {
-  return query_state()->query_mem_tracker();
+  return query_state_->query_mem_tracker();
 }
 
 void Coordinator::BackendsToJson(Document* doc) {
   Value states(kArrayType);
   {
-    lock_guard<mutex> l(lock_);
+    lock_guard<SpinLock> l(backend_states_init_lock_);
     for (BackendState* state : backend_states_) {
       Value val(kObjectType);
       state->ToJson(&val, doc);
@@ -969,7 +939,7 @@ void Coordinator::BackendsToJson(Document* doc) {
 void Coordinator::FInstanceStatsToJson(Document* doc) {
   Value states(kArrayType);
   {
-    lock_guard<mutex> l(lock_);
+    lock_guard<SpinLock> l(backend_states_init_lock_);
     for (BackendState* state : backend_states_) {
       Value val(kObjectType);
       state->InstanceStatsToJson(&val, doc);
@@ -979,6 +949,14 @@ void Coordinator::FInstanceStatsToJson(Document* doc) {
   doc->AddMember("backend_instances", states, doc->GetAllocator());
 }
 
+const TQueryCtx& Coordinator::query_ctx() const {
+  return schedule_.request().query_ctx;
+}
+
+const TUniqueId& Coordinator::query_id() const {
+  return query_ctx().query_id;
+}
+
 const TFinalizeParams* Coordinator::finalize_params() const {
   return schedule_.request().__isset.finalize_params
       ? &schedule_.request().finalize_params : nullptr;

http://git-wip-us.apache.org/repos/asf/impala/blob/da329442/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 8e556ec..a0b9b4c 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -20,29 +20,19 @@
 
 #include <string>
 #include <vector>
-#include <boost/accumulators/accumulators.hpp>
-#include <boost/accumulators/statistics/max.hpp>
-#include <boost/accumulators/statistics/mean.hpp>
-#include <boost/accumulators/statistics/median.hpp>
-#include <boost/accumulators/statistics/min.hpp>
-#include <boost/accumulators/statistics/stats.hpp>
-#include <boost/accumulators/statistics/variance.hpp>
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/mutex.hpp>
 #include <boost/unordered_map.hpp>
-#include <boost/unordered_set.hpp>
 #include <rapidjson/document.h>
 
 #include "common/global-types.h"
-#include "common/hdfs.h"
 #include "common/status.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
 #include "runtime/dml-exec-state.h"
-#include "runtime/query-state.h"
-#include "scheduling/query-schedule.h"
 #include "util/condition-variable.h"
 #include "util/progress-updater.h"
+#include "util/runtime-profile-counters.h"
 
 namespace impala {
 
@@ -55,6 +45,7 @@ class TPlanExecRequest;
 class TRuntimeProfileTree;
 class RuntimeProfile;
 class QueryResultSet;
+class QuerySchedule;
 class MemTracker;
 class PlanRootSink;
 class FragmentInstanceState;
@@ -64,10 +55,9 @@ class QueryState;
 /// Query coordinator: handles execution of fragment instances on remote 
nodes, given a
 /// TQueryExecRequest. As part of that, it handles all interactions with the 
executing
 /// backends; it is also responsible for implementing all client requests 
regarding the
-/// query, including cancellation. Once a query ends, either through 
cancellation or
-/// by returning eos, the coordinator releases resources. (Note that DML 
requests
-/// always end with cancellation, via ImpalaServer::UnregisterQuery()/
-/// ImpalaServer::CancelInternal()/ClientRequestState::Cancel().)
+/// query, including cancellation. Once a query ends, either by returning EOS, 
through
+/// client cancellation, returning an error, or by finalizing a DML request, 
the
+/// coordinator releases resources.
 ///
 /// The coordinator monitors the execution status of fragment instances and 
aborts the
 /// entire query if an error is reported by any of them.
@@ -76,80 +66,80 @@ class QueryState;
 /// rows are produced by a fragment instance that always executes on the same 
machine as
 /// the coordinator.
 ///
-/// Thread-safe, with the exception of GetNext().
-//
+/// Thread-safe except where noted.
+///
 /// A typical sequence of calls for a single query (calls under the same 
numbered
 /// item can happen concurrently):
 /// 1. client: Exec()
 /// 2. client: Wait()/client: Cancel()/backend: UpdateBackendExecStatus()
 /// 3. client: GetNext()*/client: Cancel()/backend: UpdateBackendExecStatus()
 ///
-/// The implementation ensures that setting an overall error status and 
initiating
-/// cancellation of all fragment instances is atomic.
+/// A query is considered to be executing until one of three things occurs:
+/// 1. An error is encountered. Backend cancellation is automatically 
initiated for all
+///    backends that haven't yet completed and the overall query status is set 
to the
+///    first (non-cancelled) encountered error status.
+/// 2. The query is cancelled via an explicit Cancel() call. The overall query 
status
+///    is set to CANCELLED and cancellation is initiated for all backends still
+///    executing (without an error status).
+/// 3. The query has returned all rows. The overall query status is OK (and 
remains
+///    OK). Client cancellation is no longer possible and subsequent backend 
errors are
+///    ignored. (TODO: IMPALA-6984 initiate backend cancellation in this case).
+///
+/// Lifecycle: this object must not be destroyed until after one of the three 
states
+/// above is reached (error, cancelled, or EOS) to ensure resources are 
released.
+///
+/// Lock ordering: (lower-numbered acquired before higher-numbered)
+/// 1. wait_lock_
+/// 2. exec_state_lock_, backend_states_init_lock_, filter_lock_,
+///    ExecSummary::lock (leafs)
 ///
 /// TODO: move into separate subdirectory and move nested classes into 
separate files
 /// and unnest them
-/// TODO: clean up locking behavior; in particular, clarify dependency on lock_
-/// TODO: clarify cancellation path; in particular, cancel as soon as we return
-/// all results
 class Coordinator { // NOLINT: The member variables could be re-ordered to 
save space
  public:
   Coordinator(const QuerySchedule& schedule, RuntimeProfile::EventSequence* 
events);
   ~Coordinator();
 
-  /// Initiate asynchronous execution of a query with the given schedule. When 
it returns,
-  /// all fragment instances have started executing at their respective 
backends.
-  /// A call to Exec() must precede all other member function calls.
+  /// Initiate asynchronous execution of a query with the given schedule. When 
it
+  /// returns, all fragment instances have started executing at their 
respective
+  /// backends. Exec() must be called exactly once and a call to Exec() must 
precede
+  /// all other member function calls.
   Status Exec() WARN_UNUSED_RESULT;
 
   /// Blocks until result rows are ready to be retrieved via GetNext(), or, if 
the
-  /// query doesn't return rows, until the query finishes or is cancelled.
-  /// A call to Wait() must precede all calls to GetNext().
-  /// Multiple calls to Wait() are idempotent and it is okay to issue multiple
-  /// Wait() calls concurrently.
+  /// query doesn't return rows, until the query finishes or is cancelled. A 
call to
+  /// Wait() must precede all calls to GetNext().  Multiple calls to Wait() are
+  /// idempotent and it is okay to issue multiple Wait() calls concurrently.
   Status Wait() WARN_UNUSED_RESULT;
 
   /// Fills 'results' with up to 'max_rows' rows. May return fewer than 
'max_rows'
-  /// rows, but will not return more.
-  ///
-  /// If *eos is true, execution has completed. Subsequent calls to GetNext() 
will be a
-  /// no-op.
-  ///
-  /// GetNext() will not set *eos=true until all fragment instances have 
either completed
-  /// or have failed.
-  ///
-  /// Returns an error status if an error was encountered either locally or by 
any of the
-  /// remote fragments or if the query was cancelled.
+  /// rows, but will not return more. If *eos is true, all rows have been 
returned.
+  /// Returns a non-OK status if an error was encountered either locally or by 
any of
+  /// the executing backends, or if the query was cancelled via Cancel().  
After *eos
+  /// is true, subsequent calls to GetNext() will be a no-op.
   ///
   /// GetNext() is not thread-safe: multiple threads must not make concurrent 
GetNext()
-  /// calls (but may call any of the other member functions concurrently with 
GetNext()).
+  /// calls.
   Status GetNext(QueryResultSet* results, int max_rows, bool* eos) 
WARN_UNUSED_RESULT;
 
-  /// Cancel execution of query. This includes the execution of the local plan 
fragment,
-  /// if any, as well as all plan fragments on remote nodes. Sets 
query_status_ to the
-  /// given cause if non-NULL. Otherwise, sets query_status_ to 
Status::CANCELLED.
-  /// Idempotent.
-  void Cancel(const Status* cause = nullptr);
+  /// Cancel execution of query and sets the overall query status to CANCELLED 
if the
+  /// query is still executing. Idempotent.
+  void Cancel();
 
-  /// Updates execution status of a particular backend as well as 
dml_exec_state_.
-  /// Also updates num_remaining_backends_ and cancels execution if the 
backend has an
-  /// error status.
+  /// Called by the report status RPC handler to update execution status of a
+  /// particular backend as well as dml_exec_state_ and the profile.
   Status UpdateBackendExecStatus(const TReportExecStatusParams& params)
       WARN_UNUSED_RESULT;
 
-  /// Only valid to call after Exec().
-  QueryState* query_state() const { return query_state_; }
-
   /// Get cumulative profile aggregated over all fragments of the query.
   /// This is a snapshot of the current state of execution and will change in
   /// the future if not all fragments have finished execution.
   RuntimeProfile* query_profile() const { return query_profile_; }
 
-  const TUniqueId& query_id() const;
-
+  /// Safe to call only after Exec().
   MemTracker* query_mem_tracker() const;
 
-  /// This is safe to call only after Wait()
+  /// Safe to call only after Wait().
   DmlExecState* dml_exec_state() { return &dml_exec_state_; }
 
   /// Return error log for coord and all the fragments. The error messages 
from the
@@ -158,9 +148,6 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
 
   const ProgressUpdater& progress() { return progress_; }
 
-  /// Returns query_status_.
-  Status GetStatus();
-
   /// Get a copy of the current exec summary. Thread-safe.
   void GetTExecSummary(TExecSummary* exec_summary);
 
@@ -187,18 +174,20 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// owned by the ClientRequestState that owns this coordinator
   const QuerySchedule& schedule_;
 
-  /// copied from TQueryExecRequest, governs when to call ReportQuerySummary
+  /// Copied from TQueryExecRequest, governs when finalization occurs. Set in 
Exec().
   TStmtType::type stmt_type_;
 
-  /// BackendStates for all execution backends, including the coordinator.
-  /// All elements are non-nullptr. Owned by obj_pool(). Populated by
-  /// InitBackendExec().
+  /// BackendStates for all execution backends, including the coordinator. All 
elements
+  /// are non-nullptr and owned by obj_pool(). Populated by 
Exec()/InitBackendStates().
   std::vector<BackendState*> backend_states_;
 
-  // index into backend_states_ for coordinator fragment; -1 if no coordinator 
fragment
-  int coord_backend_idx_ = -1;
+  /// Protects the population of backend_states_ vector (not the BackendState 
objects).
+  /// Used when accessing backend_states_ if it's not guaranteed that
+  /// InitBackendStates() has completed.
+  SpinLock backend_states_init_lock_;
 
-  /// The QueryState for this coordinator. Set in Exec(). Released in 
TearDown().
+  /// The QueryState for this coordinator. Reference taken in Exec(). Reference
+  /// released in destructor.
   QueryState* query_state_ = nullptr;
 
   /// Non-null if and only if the query produces results for the client; i.e. 
is of
@@ -209,22 +198,28 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// Result rows are materialized by this fragment instance in its own 
thread. They are
   /// materialized into a QueryResultSet provided to the coordinator during 
GetNext().
   ///
-  /// Not owned by this class. Set in Exec(). Reset to nullptr (and the implied
-  /// reference of QueryState released) in TearDown().
+  /// Owned by the QueryState. Set in Exec().
   FragmentInstanceState* coord_instance_ = nullptr;
 
-  /// Not owned by this class. Set in Exec(). Reset to nullptr in TearDown() 
or when
-  /// GetNext() hits eos.
+  /// Owned by the QueryState. Set in Exec().
   PlanRootSink* coord_sink_ = nullptr;
 
-  /// ensures single-threaded execution of Wait(); must not hold lock_ when 
acquiring this
+  /// ensures single-threaded execution of Wait(). See lock ordering class 
comment.
   boost::mutex wait_lock_;
 
   bool has_called_wait_ = false;  // if true, Wait() was called; protected by 
wait_lock_
 
-  /// Keeps track of number of completed ranges and total scan ranges.
+  /// Keeps track of number of completed ranges and total scan ranges. 
Initialized by
+  /// Exec().
   ProgressUpdater progress_;
 
+  /// Aggregate counters for the entire query. Lives in 'obj_pool_'. Set in 
Exec().
+  RuntimeProfile* query_profile_ = nullptr;
+
+  /// Total time spent in finalization (typically 0 except for INSERT into hdfs
+  /// tables). Set in Exec().
+  RuntimeProfile::Counter* finalization_timer_ = nullptr;
+
   /// Total number of filter updates received (always 0 if filter mode is not
   /// GLOBAL). Excludes repeated broadcast filter updates. Set in Exec().
   RuntimeProfile::Counter* filter_updates_received_ = nullptr;
@@ -255,6 +250,7 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
     void Init(const QuerySchedule& query_schedule);
   };
 
+  // Initialized by Exec().
   ExecSummary exec_summary_;
 
   /// Filled in as the query completes and tracks the results of DML queries.  
This is
@@ -262,52 +258,40 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// coordinator fragment: only one of the two can legitimately produce 
updates.
   DmlExecState dml_exec_state_;
 
-  /// Aggregate counters for the entire query. Lives in 'obj_pool_'.
-  RuntimeProfile* query_profile_ = nullptr;
-
-  /// Protects all fields below. This is held while making RPCs, so this lock 
should
-  /// only be acquired if the acquiring thread is prepared to wait for a 
significant
-  /// time.
-  /// TODO: clarify to what extent the fields below need to be protected by 
lock_
-  /// Lock ordering is
-  /// 1. wait_lock_
-  /// 2. lock_
-  /// 3. BackendState::lock_
-  /// 4. filter_lock_
-  boost::mutex lock_;
-
-  /// Overall status of the entire query; set to the first reported fragment 
error
-  /// status or to CANCELLED, if Cancel() is called.
-  Status query_status_;
-
-  /// If true, the query is done returning all results.  It is possible that 
the
-  /// coordinator still needs to wait for cleanup on remote fragments (e.g. 
queries
-  /// with limit)
-  /// Once this is set to true, errors from execution backends are ignored.
-  bool returned_all_results_ = false;
-
-  /// If there is no coordinator fragment, Wait() simply waits until all
-  /// backends report completion by notifying on backend_completion_cv_.
-  /// Tied to lock_.
-  ConditionVariable backend_completion_cv_;
-
-  /// Count of the number of backends for which done != true. When this
-  /// hits 0, any Wait()'ing thread is notified
-  int num_remaining_backends_ = 0;
-
   /// Event timeline for this query. Not owned.
   RuntimeProfile::EventSequence* query_events_ = nullptr;
 
-  /// Indexed by fragment idx (TPlanFragment.idx). Filled in 
InitFragmentStats(),
-  /// elements live in obj_pool().
+  /// Indexed by fragment idx (TPlanFragment.idx). Filled in
+  /// Exec()/InitFragmentStats(), elements live in obj_pool(). Updated by 
BackendState
+  /// sequentially, without synchronization.
   std::vector<FragmentStats*> fragment_stats_;
 
-  /// total time spent in finalization (typically 0 except for INSERT into 
hdfs tables)
-  RuntimeProfile::Counter* finalization_timer_ = nullptr;
+  /// Barrier that is released when all calls to BackendState::Exec() have
+  /// returned. Initialized in StartBackendExec().
+  boost::scoped_ptr<CountingBarrier> exec_rpcs_complete_barrier_;
+
+  /// Barrier that is released when all backends have indicated execution 
completion,
+  /// or when all backends are cancelled due to an execution error or client 
requested
+  /// cancellation. Initialized in StartBackendExec().
+  boost::scoped_ptr<CountingBarrier> backend_exec_complete_barrier_;
+
+  SpinLock exec_state_lock_; // protects exec-state_ and exec_status_
 
-  /// Barrier that is released when all calls to ExecRemoteFragment() have
-  /// returned, successfully or not. Initialised during Exec().
-  boost::scoped_ptr<CountingBarrier> exec_complete_barrier_;
+  /// EXECUTING: in-flight; the only non-terminal state
+  /// RETURNED_RESULTS: GetNext() set eos to true, or for DML, the request is 
complete
+  /// CANCELLED: Cancel() was called (not: someone called CancelBackends())
+  /// ERROR: received an error from a backend
+  enum class ExecState {
+    EXECUTING, RETURNED_RESULTS, CANCELLED, ERROR
+  };
+  ExecState exec_state_ = ExecState::EXECUTING;
+
+  /// Overall execution status; only set on exec_state_ transitions:
+  /// - EXECUTING: OK
+  /// - RETURNED_RESULTS: OK
+  /// - CANCELLED: CANCELLED
+  /// - ERROR: error status
+  Status exec_status_;
 
   /// Protects filter_routing_table_.
   SpinLock filter_lock_;
@@ -320,12 +304,6 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// safe to concurrently read from filter_routing_table_.
   bool filter_routing_table_complete_ = false;
 
-  /// True if and only if ReleaseExecResources() has been called.
-  bool released_exec_resources_ = false;
-
-  /// True if and only if ReleaseAdmissionControlResources() has been called.
-  bool released_admission_control_resources_ = false;
-
   /// Returns a local object pool.
   ObjectPool* obj_pool() { return obj_pool_.get(); }
 
@@ -333,36 +311,67 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// HDFS INSERT finalization is not required.
   const TFinalizeParams* finalize_params() const;
 
-  const TQueryCtx& query_ctx() const { return schedule_.request().query_ctx; }
+  const TQueryCtx& query_ctx() const;
 
-  /// Only valid *after* calling Exec(). Return nullptr if the running query 
does not
-  /// produce any rows.
-  RuntimeState* runtime_state();
+  const TUniqueId& query_id() const;
 
   /// Returns a pretty-printed table of the current filter state.
   std::string FilterDebugString();
 
-  /// Cancels all fragment instances. Assumes that lock_ is held. This may be 
called when
-  /// the query is not being cancelled in the case where the query limit is 
reached.
-  void CancelInternal();
-
-  /// Acquires lock_ and updates query_status_ with 'status' if it's not 
already
-  /// an error status, and returns the current query_status_. The status may be
-  /// due to an error in a specific fragment instance, or it can be a general 
error
-  /// not tied to a specific fragment instance.
-  /// Calls CancelInternal() when switching to an error status.
-  /// When an error is due to a specific fragment instance, 
'is_fragment_failure' must
-  /// be true and 'failed_fragment' is the fragment_id that has failed, used 
for error
-  /// reporting. For a general error not tied to a specific instance,
-  /// 'is_fragment_failure' must be false and 'failed_fragment' will be 
ignored.
-  /// 'backend_hostname' is used for error reporting in either case.
-  Status UpdateStatus(const Status& status, const std::string& 
backend_hostname,
-      bool is_fragment_failure, const TUniqueId& failed_fragment) 
WARN_UNUSED_RESULT;
-
-  /// Returns only when either all execution backends have reported success or 
the query
-  /// is in error. Returns the status of the query.
-  /// It is safe to call this concurrently, but any calls must be made only 
after Exec().
-  Status WaitForBackendCompletion() WARN_UNUSED_RESULT;
+  /// Called when the query is done executing due to reaching EOS or client
+  /// cancellation. If 'exec_state_' != EXECUTING, does nothing. Otherwise sets
+  /// 'exec_state_' to 'state' (must be either CANCELLED or RETURNED_RESULTS), 
and
+  /// finalizes execution (cancels remaining backends if transitioning to 
CANCELLED;
+  /// either way, calls ComputeQuerySummary() and releases resources). Returns 
the
+  /// resulting overall execution status.
+  Status SetNonErrorTerminalState(const ExecState state) WARN_UNUSED_RESULT;
+
+  /// Transitions 'exec_state_' given an execution status and returns the 
resulting
+  /// overall status:
+  ///
+  /// - if the 'status' parameter is ok, no state transition
+  /// - if 'exec_state_' is EXECUTING and 'status' is not ok, transitions to 
ERROR
+  /// - if 'exec_state_' is already RETURNED_RESULTS, CANCELLED, or ERROR: 
does not
+  ///   transition state (those are terminal states) however in the case of 
ERROR,
+  ///   status may be updated to a more interesting status.
+  ///
+  /// Should not be called for (client initiated) cancellation. Call
+  /// SetNonErrorTerminalState(CANCELLED) instead.
+  ///
+  /// 'failed_finstance' is the fragment instance id that has failed (or 
nullptr if the
+  /// failure is not specific to a fragment instance), used for error 
reporting along
+  /// with 'instance_hostname'.
+  Status UpdateExecState(const Status& status, const TUniqueId* 
failed_finstance,
+      const string& instance_hostname) WARN_UNUSED_RESULT;
+
+  /// Helper for SetNonErrorTerminalState() and UpdateExecStateIfError(). If 
the caller
+  /// transitioned to a terminal state (which happens exactly once for the 
lifetime of
+  /// the Coordinator object), then finalizes execution (cancels remaining 
backends if
+  /// transitioning to CANCELLED; in all cases releases resources and calls
+  /// ComputeQuerySummary().
+  void HandleExecStateTransition(const ExecState old_state, const ExecState 
new_state);
+
+  /// Return true if 'exec_state_' is RETURNED_RESULTS.
+  /// TODO: remove with IMPALA-6984.
+  bool ReturnedAllResults() WARN_UNUSED_RESULT;
+
+  /// Return the string representation of 'state'.
+  static const char* ExecStateToString(const ExecState state);
+
+  // For DCHECK_EQ, etc of ExecState values.
+  friend std::ostream& operator<<(std::ostream& o, const ExecState s) {
+    return o << ExecStateToString(s);
+  }
+
+  /// Helper for HandleExecStateTransition(). Sends cancellation request to all
+  /// executing backends but does not wait for acknowledgement from the 
backends. The
+  /// ExecState state-machine ensures this is called at most once.
+  void CancelBackends();
+
+  /// Returns only when either all execution backends have reported success or 
a request
+  /// to cancel the backends has already been sent. It is safe to call this 
concurrently,
+  /// but any calls must be made only after Exec().
+  void WaitForBackends();
 
   /// Initializes fragment_stats_ and query_profile_. Must be called before
   /// InitBackendStates().
@@ -384,36 +393,33 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// finishing the INSERT in flight.
   Status FinalizeHdfsInsert() WARN_UNUSED_RESULT;
 
-  /// Populates backend_states_, starts query execution at all backends in 
parallel, and
-  /// blocks until startup completes.
+  /// Helper for Exec(). Populates backend_states_, starts query execution at 
all
+  /// backends in parallel, and blocks until startup completes.
   void StartBackendExec();
 
-  /// Calls CancelInternal() and returns an error if there was any error 
starting
-  /// backend execution.
-  /// Also updates query_profile_ with the startup latency histogram.
+  /// Helper for Exec(). Checks for errors encountered when starting backend 
execution,
+  /// using any non-OK status, if any, as the overall status. Returns the 
overall
+  /// status. Also updates query_profile_ with the startup latency histogram.
   Status FinishBackendStartup() WARN_UNUSED_RESULT;
 
   /// Build the filter routing table by iterating over all plan nodes and 
collecting the
   /// filters that they either produce or consume.
   void InitFilterRoutingTable();
 
-  /// Releases all resources associated with query execution. Acquires lock_. 
Idempotent.
+  /// Helper for HandleExecStateTransition(). Releases all resources 
associated with
+  /// query execution. The ExecState state-machine ensures this is called 
exactly once.
   void ReleaseExecResources();
 
-  /// Same as ReleaseExecResources() except the lock must be held by the 
caller.
-  void ReleaseExecResourcesLocked();
-
-  /// Releases admission control resources for use by other queries.
-  /// This should only be called if one of following preconditions is 
satisfied for each
-  /// backend on which the query is executing:
-  /// * The backend finished execution.
-  ///   Rationale: the backend isn't consuming resources.
-  //
+  /// Helper for HandleExecStateTransition(). Releases admission control 
resources for
+  /// use by other queries. This should only be called if one of following
+  /// preconditions is satisfied for each backend on which the query is 
executing:
+  ///
+  /// * The backend finished execution.  Rationale: the backend isn't consuming
+  ///   resources.
   /// * A cancellation RPC was delivered to the backend.
   ///   Rationale: the backend will be cancelled and release resources soon. 
By the
   ///   time a newly admitted query fragment starts up on the backend and 
starts consuming
   ///   resources, the resources from this query will probably have been 
released.
-  //
   /// * Sending the cancellation RPC to the backend failed
   ///   Rationale: the backend is either down or will tear itself down when it 
next tries
   ///   to send a status RPC to the coordinator. It's possible that the 
fragment will be
@@ -424,16 +430,13 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   ///   pessimistically queueing queries while we wait for a response from a 
backend that
   ///   may never come.
   ///
-  /// Calling WaitForBackendCompletion() or CancelInternal() before this 
function is
-  /// sufficient to satisfy the above preconditions. If the query has an 
expensive
-  /// finalization step post query execution (e.g. a DML statement), then this 
should
-  /// be called after that completes to avoid over-admitting queries.
+  /// Calling WaitForBackends() or CancelBackends() before this function is 
sufficient
+  /// to satisfy the above preconditions. If the query has an expensive 
finalization
+  /// step post query execution (e.g. a DML statement), then this should be 
called
+  /// after that completes to avoid over-admitting queries.
   ///
-  /// Acquires lock_. Idempotent.
+  /// The ExecState state-machine ensures this is called exactly once.
   void ReleaseAdmissionControlResources();
-
-  /// Same as ReleaseAdmissionControlResources() except lock must be held by 
caller.
-  void ReleaseAdmissionControlResourcesLocked();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/da329442/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index 12b9b78..2ca1256 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -904,7 +904,7 @@ Status ClientRequestState::Cancel(bool check_inflight, 
const Status* cause) {
 
   // Cancel the parent query. 'lock_' should not be held because cancellation 
involves
   // RPCs and can block for a long time.
-  if (coord != NULL) coord->Cancel(cause);
+  if (coord != NULL) coord->Cancel();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/da329442/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index fb3f261..3af4c9b 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -111,11 +111,6 @@ class ClientRequestState;
 /// 6. ClientRequestState::expiration_data_lock_
 /// 7. Coordinator::exec_summary_lock
 ///
-/// Coordinator::lock_ should not be acquired at the same time as the
-/// ImpalaServer/SessionState/ClientRequestState locks. Aside from
-/// Coordinator::exec_summary_lock_ the Coordinator's lock ordering is 
independent of
-/// the above lock ordering.
-///
 /// The following locks are not held in conjunction with other locks:
 /// * query_log_lock_
 /// * session_timeout_lock_

http://git-wip-us.apache.org/repos/asf/impala/blob/da329442/be/src/util/counting-barrier.h
----------------------------------------------------------------------
diff --git a/be/src/util/counting-barrier.h b/be/src/util/counting-barrier.h
index 49b0bde..827c526 100644
--- a/be/src/util/counting-barrier.h
+++ b/be/src/util/counting-barrier.h
@@ -33,8 +33,23 @@ class CountingBarrier {
   }
 
   /// Sends one notification, decrementing the number of pending notifications 
by one.
-  void Notify() {
-    if (count_.Add(-1) == 0) promise_.Set(true);
+  /// Returns the remaining pending notifications.
+  int32_t Notify() {
+    int32_t result = count_.Add(-1);
+    if (result == 0) promise_.Set(true);
+    return result;
+  }
+
+  /// Sets the number of pending notifications to 0 and unblocks Wait().
+  void NotifyRemaining() {
+    while (true) {
+      int32_t value = count_.Load();
+      if (value <= 0) return;  // count_ can legitimately drop below 0
+      if (count_.CompareAndSwap(value, 0)) {
+        promise_.Set(true);
+        return;
+      }
+    }
   }
 
   /// Blocks until all notifications are received.
@@ -44,6 +59,8 @@ class CountingBarrier {
   /// case '*timed_out' will be true.
   void Wait(int64_t timeout_ms, bool* timed_out) { promise_.Get(timeout_ms, 
timed_out); }
 
+  int32_t pending() const { return count_.Load(); }
+
  private:
   /// Used to signal waiters when all notifications are received.
   Promise<bool> promise_;

Reply via email to