IMPALA-4890/5143: Coordinator race involving TearDown()

TearDown() releases resources and destroys control
structures (the QueryState reference), and it can be called
while a concurrent thread executes Exec() or might call
GetNext() in the future. The solution is not to destroy
the control structures.

This also releases resources automatically at the end
of query execution.

Change-Id: I457a6424a0255c137336c4bc01a6e7ed830d18c7
Reviewed-on: http://gerrit.cloudera.org:8080/6897
Reviewed-by: Marcel Kornacker <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/bad10da4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/bad10da4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/bad10da4

Branch: refs/heads/master
Commit: bad10da4a6eb13673aa44eb4d45d6ad87a2dd690
Parents: 5d59d85
Author: Marcel Kornacker <[email protected]>
Authored: Sun May 14 21:32:41 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Fri May 26 13:45:42 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator.cc          | 55 +++++++++++++----------------
 be/src/runtime/coordinator.h           | 36 ++++++++-----------
 be/src/service/client-request-state.cc |  3 --
 3 files changed, 38 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bad10da4/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 741da32..5b86690 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -108,7 +108,11 @@ Coordinator::Coordinator(
     query_events_(events) {}
 
 Coordinator::~Coordinator() {
-  DCHECK(torn_down_) << "TearDown() must be called before Coordinator is 
destroyed";
+  DCHECK(released_resources_)
+      << "ReleaseResources() must be called before Coordinator is destroyed";
+  if (query_state_ != nullptr) {
+    ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
+  }
 }
 
 Status Coordinator::Exec() {
@@ -416,8 +420,6 @@ Status Coordinator::FinishBackendStartup() {
       "Backend startup latencies", latencies.ToHumanReadable());
 
   if (!status.ok()) {
-    // TODO: do not allow cancellation via the debug page until Exec() has 
returned
-    //DCHECK(query_status_.ok()); // nobody should have been able to cancel
     query_status_ = status;
     CancelInternal();
   }
@@ -881,19 +883,14 @@ Status Coordinator::GetNext(QueryResultSet* results, int 
max_rows, bool* eos) {
 
   if (*eos) {
     returned_all_results_ = true;
-    // Trigger tear-down of coordinator fragment by closing the consumer. Must 
do before
-    // WaitForBackendCompletion().
-    coord_sink_->CloseConsumer();
-    coord_sink_ = nullptr;
-
-    // Don't return final NULL until all instances have completed.  GetNext 
must wait for
-    // all instances to complete before ultimately signalling the end of 
execution via a
-    // NULL batch. After NULL is returned, the coordinator may tear down query 
state, and
-    // perform post-query finalization which might depend on the reports from 
all
-    // backends.
-    //
-    // TODO: Waiting should happen in TearDown() (and then we wouldn't need to 
call
-    // CloseConsumer() here). See IMPALA-4275 for details.
+    // release resources here, since we won't be fetching more result rows
+    {
+      lock_guard<mutex> l(lock_);
+      ReleaseResources();
+    }
+
+    // wait for all backends to complete before computing the summary
+    // TODO: relocate this so GetNext() won't have to wait for backends to 
complete?
     RETURN_IF_ERROR(WaitForBackendCompletion());
     // if the query completed successfully, compute the summary
     if (query_status_.ok()) ComputeQuerySummary();
@@ -912,12 +909,15 @@ void Coordinator::Cancel(const Status* cause) {
   // should explicitly pass Status::OK()). Fragment instances may be cancelled 
at the end
   // of a successful query. Need to clean up relationship between 
query_status_ here and
   // in QueryExecState. See IMPALA-4279.
-  query_status_ = (cause != NULL && !cause->ok()) ? *cause : Status::CANCELLED;
+  query_status_ = (cause != nullptr && !cause->ok()) ? *cause : 
Status::CANCELLED;
   CancelInternal();
 }
 
 void Coordinator::CancelInternal() {
   VLOG_QUERY << "Cancel() query_id=" << query_id();
+  // TODO: remove when restructuring cancellation, which should happen 
automatically
+  // as soon as the coordinator knows that the query is finished
+  DCHECK(!query_status_.ok());
 
   int num_cancelled = 0;
   for (BackendState* backend_state: backend_states_) {
@@ -931,6 +931,8 @@ void Coordinator::CancelInternal() {
 
   // Report the summary with whatever progress the query made before being 
cancelled.
   ComputeQuerySummary();
+
+  ReleaseResources();
 }
 
 Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& 
params) {
@@ -1076,11 +1078,9 @@ string Coordinator::GetErrorLog() {
   return PrintErrorMapToString(merged);
 }
 
-// TODO: call this as soon as it's clear that we won't reference the state
-// anymore, ie, in CancelInternal() and when GetNext() hits eos
-void Coordinator::TearDown() {
-  DCHECK(!torn_down_) << "Coordinator::TearDown() must not be called twice";
-  torn_down_ = true;
+void Coordinator::ReleaseResources() {
+  if (released_resources_) return;
+  released_resources_ = true;
   if (filter_routing_table_.size() > 0) {
     query_profile_->AddInfoString("Final filter table", FilterDebugString());
   }
@@ -1094,20 +1094,13 @@ void Coordinator::TearDown() {
   }
   // This may be NULL while executing UDFs.
   if (filter_mem_tracker_.get() != nullptr) {
+    // TODO: move this elsewhere, this isn't releasing resources (it's 
dismantling
+    // control structures)
     filter_mem_tracker_->UnregisterFromParent();
-    filter_mem_tracker_.reset();
   }
   // Need to protect against failed Prepare(), where root_sink() would not be 
set.
   if (coord_sink_ != nullptr) {
     coord_sink_->CloseConsumer();
-    coord_sink_ = nullptr;
-  }
-  coord_instance_ = nullptr;
-  if (query_state_ != nullptr) {
-    // Tear down the query state last - other members like 
'filter_mem_tracker_'
-    // may reference objects with query lifetime, like the query MemTracker.
-    ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
-    query_state_ = nullptr;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bad10da4/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index bc635b1..0d772eb 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -75,7 +75,10 @@ class QueryState;
 /// Query coordinator: handles execution of fragment instances on remote 
nodes, given a
 /// TQueryExecRequest. As part of that, it handles all interactions with the 
executing
 /// backends; it is also responsible for implementing all client requests 
regarding the
-/// query, including cancellation.
+/// query, including cancellation. Once a query ends, either through 
cancellation or
+/// by returning eos, the coordinator releases resources. (Note that DML 
requests
+/// always end with cancellation, via ImpalaServer::UnregisterQuery()/
+/// ImpalaServer::CancelInternal()/ClientRequestState::Cancel().)
 ///
 /// The coordinator monitors the execution status of fragment instances and 
aborts the
 /// entire query if an error is reported by any of them.
@@ -84,10 +87,7 @@ class QueryState;
 /// rows are produced by a fragment instance that always executes on the same 
machine as
 /// the coordinator.
 ///
-/// Once a query has finished executing and all results have been returned 
either to the
-/// caller of GetNext() or a data sink, execution_completed() will return 
true. If the
-/// query is aborted, execution_completed should also be set to true. 
Coordinator is
-/// thread-safe, with the exception of GetNext().
+/// Thread-safe, with the exception of GetNext().
 //
 /// A typical sequence of calls for a single query (calls under the same 
numbered
 /// item can happen concurrently):
@@ -98,9 +98,6 @@ class QueryState;
 /// The implementation ensures that setting an overall error status and 
initiating
 /// cancellation of all fragment instances is atomic.
 ///
-/// TODO: remove TearDown() and replace with ReleaseResources(); TearDown() 
currently
-/// also disassembles the control structures (such as the local reference to 
the
-/// coordinator's FragmentInstanceState)
 /// TODO: move into separate subdirectory and move nested classes into 
separate files
 /// and unnest them
 /// TODO: clean up locking behavior; in particular, clarify dependency on lock_
@@ -143,7 +140,7 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// if any, as well as all plan fragments on remote nodes. Sets 
query_status_ to the
   /// given cause if non-NULL. Otherwise, sets query_status_ to 
Status::CANCELLED.
   /// Idempotent.
-  void Cancel(const Status* cause = NULL);
+  void Cancel(const Status* cause = nullptr);
 
   /// Updates execution status of a particular backend as well as 
Insert-related
   /// status (per_partition_status_ and files_to_move_). Also updates
@@ -151,13 +148,8 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   Status UpdateBackendExecStatus(const TReportExecStatusParams& params)
       WARN_UNUSED_RESULT;
 
-  /// Returns the query state.
-  /// Only valid to call after Exec() and before TearDown(). The returned
-  /// reference only remains valid until TearDown() is called.
-  QueryState* query_state() const {
-    DCHECK(!torn_down_);
-    return query_state_;
-  }
+  /// Only valid to call after Exec().
+  QueryState* query_state() const { return query_state_; }
 
   /// Only valid *after* calling Exec(). Return nullptr if the running query 
does not
   /// produce any rows.
@@ -207,10 +199,6 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// filter to fragment instances.
   void UpdateFilter(const TUpdateFilterParams& params);
 
-  /// Called once query execution is complete to tear down any remaining state.
-  /// TODO: change to ReleaseResources() and don't tear down control 
structures.
-  void TearDown();
-
  private:
   class BackendState;
   struct FilterTarget;
@@ -368,8 +356,8 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// safe to concurrently read from filter_routing_table_.
   bool filter_routing_table_complete_ = false;
 
-  /// True if and only if TearDown() has been called.
-  bool torn_down_ = false;
+  /// True if and only if ReleaseResources() has been called.
+  bool released_resources_ = false;
 
   /// Returns a local object pool.
   ObjectPool* obj_pool() { return obj_pool_.get(); }
@@ -447,6 +435,10 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// Build the filter routing table by iterating over all plan nodes and 
collecting the
   /// filters that they either produce or consume.
   void InitFilterRoutingTable();
+
+  /// Releases filter resources, unregisters the filter mem tracker, and calls
+  /// CloseConsumer() on coord_sink_. Requires lock_ to be held. Idempotent.
+  void ReleaseResources();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bad10da4/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index 42e8de3..dcea8cb 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -446,8 +446,6 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
     lock_guard<mutex> l(lock_);
     // Don't start executing the query if Cancel() was called concurrently 
with Exec().
     if (is_cancelled_) return Status::CANCELLED;
-    // TODO: make schedule local to coordinator and move schedule_->Release() 
into
-    // Coordinator::TearDown()
     schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
         exec_request_.query_options, &summary_profile_, query_events_));
   }
@@ -585,7 +583,6 @@ void ClientRequestState::Done() {
                      << " because of error: " << status.GetDetail();
       }
     }
-    coord_->TearDown();
   }
 }
 

Reply via email to