IMPALA-3748: add query-wide resource acquisition step

This adds a Prepare() method to QueryState that allows
query execution to fail up-front if resources cannot be
acquired. We don't make full use of this yet, but it
provides a suitable place to acquire a memory reservation
for the query or fail cleanly.

The process memory limit check is moved here so that all
startup memory checks will be eventually consolidated in
one place.

Also switch a boost::mutex to a SpinLock for consistency
and to improve performance (see lock-benchmark.cc).

Change-Id: Ia21a3c0f0b0a7175116883ef9871b93c8ce8bb81
Reviewed-on: http://gerrit.cloudera.org:8080/5739
Tested-by: Impala Public Jenkins
Reviewed-by: Tim Armstrong <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cb52b2b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cb52b2b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cb52b2b8

Branch: refs/heads/master
Commit: cb52b2b8aebfe24e09743fca3c32936fa85105d2
Parents: 4b486b0
Author: Tim Armstrong <[email protected]>
Authored: Mon Jan 16 13:44:46 2017 -0800
Committer: Tim Armstrong <[email protected]>
Committed: Tue Jan 24 18:49:24 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/query-exec-mgr.cc | 17 ++++++-----------
 be/src/runtime/query-exec-mgr.h  |  4 ++--
 be/src/runtime/query-state.cc    | 33 ++++++++++++++++++++++++++++++---
 be/src/runtime/query-state.h     | 22 +++++++++++++++++++---
 be/src/runtime/test-env.cc       |  1 +
 5 files changed, 58 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb52b2b8/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 5990a24..c38f478 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -45,22 +45,17 @@ Status QueryExecMgr::StartFInstance(const 
TExecPlanFragmentParams& params) {
   VLOG_QUERY << "StartFInstance() instance_id=" << PrintId(instance_id)
              << " coord=" << params.query_ctx.coord_address;
 
-  // Starting a new fragment instance creates a thread and consumes a 
non-trivial
-  // amount of memory. If we are already starved for memory, cancel the 
instance as
-  // early as possible to avoid digging the hole deeper.
-  MemTracker* process_mem_tracker = 
ExecEnv::GetInstance()->process_mem_tracker();
-  if (process_mem_tracker->LimitExceeded()) {
-    string msg = Substitute("Instance $0 of plan fragment $1 could not "
-        "start because the backend Impala daemon is over its memory limit",
-        PrintId(instance_id), params.fragment_ctx.fragment.display_name);
-    return process_mem_tracker->MemLimitExceeded(NULL, msg, 0);
-  }
-
   bool dummy;
   QueryState* qs = GetOrCreateQueryState(
       params.query_ctx, params.fragment_instance_ctx.request_pool, &dummy);
   DCHECK(params.__isset.fragment_ctx);
   DCHECK(params.__isset.fragment_instance_ctx);
+  Status status = qs->Prepare();
+  if (!status.ok()) {
+    ReleaseQueryState(qs);
+    return status;
+  }
+
   FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState(
       qs, params.fragment_ctx, params.fragment_instance_ctx, 
params.query_ctx.desc_tbl));
   // register instance before returning so that async Cancel() calls can

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb52b2b8/be/src/runtime/query-exec-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h
index 37abb85..92e756f 100644
--- a/be/src/runtime/query-exec-mgr.h
+++ b/be/src/runtime/query-exec-mgr.h
@@ -56,8 +56,8 @@ class QueryExecMgr {
   /// return value of this function.
   Status StartFInstance(const TExecPlanFragmentParams& params);
 
-  /// Creates and the QueryState for the given query with the provided 
parameters. Only
-  /// valid to call if the QueryState does not already exist. The caller must 
call
+  /// Creates a QueryState for the given query with the provided parameters. 
Only valid
+  /// to call if the QueryState does not already exist. The caller must call
   /// ReleaseQueryState() with the returned QueryState to decrement the 
refcount.
   QueryState* CreateQueryState(
       const TQueryCtx& query_ctx, const std::string& request_pool);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb52b2b8/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 7cc3396..7757936 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -39,7 +39,7 @@ QueryState::ScopedRef::~ScopedRef() {
 }
 
 QueryState::QueryState(const TQueryCtx& query_ctx, const std::string& pool)
-  : query_ctx_(query_ctx), refcnt_(0), released_resources_(false) {
+  : query_ctx_(query_ctx), refcnt_(0), prepared_(false), 
released_resources_(false) {
   TQueryOptions& query_options = query_ctx_.client_request.query_options;
   // max_errors does not indicate how many errors in total have been recorded, 
but rather
   // how many are distinct. It is defined as the sum of the number of generic 
errors and
@@ -63,6 +63,33 @@ QueryState::~QueryState() {
   DCHECK(released_resources_);
 }
 
+Status QueryState::Prepare() {
+  lock_guard<SpinLock> l(prepare_lock_);
+  if (prepared_) {
+    DCHECK(prepare_status_.ok());
+    return Status::OK();
+  }
+  RETURN_IF_ERROR(prepare_status_);
+
+  Status status;
+  // Starting a new query creates threads and consumes a non-trivial amount of 
memory.
+  // If we are already starved for memory, fail as early as possible to avoid 
consuming
+  // more resources.
+  MemTracker* process_mem_tracker = 
ExecEnv::GetInstance()->process_mem_tracker();
+  if (process_mem_tracker->LimitExceeded()) {
+    string msg = Substitute("Query $0 could not start because the backend 
Impala daemon "
+                            "is over its memory limit",
+        PrintId(query_id()));
+    prepare_status_ = process_mem_tracker->MemLimitExceeded(NULL, msg, 0);
+    return prepare_status_;
+  }
+
+  // TODO: IMPALA-3748: acquire minimum buffer reservation at this point.
+
+  prepared_ = true;
+  return Status::OK();
+}
+
 void QueryState::InitMemTrackers(const std::string& pool) {
   int64_t bytes_limit = -1;
   if (query_options().__isset.mem_limit && query_options().mem_limit > 0) {
@@ -76,14 +103,14 @@ void QueryState::InitMemTrackers(const std::string& pool) {
 
 void QueryState::RegisterFInstance(FragmentInstanceState* fis) {
   VLOG_QUERY << "RegisterFInstance(): instance_id=" << 
PrintId(fis->instance_id());
-  lock_guard<mutex> l(fis_map_lock_);
+  lock_guard<SpinLock> l(fis_map_lock_);
   DCHECK_EQ(fis_map_.count(fis->instance_id()), 0);
   fis_map_.insert(make_pair(fis->instance_id(), fis));
 }
 
 FragmentInstanceState* QueryState::GetFInstanceState(const TUniqueId& 
instance_id) {
   VLOG_FILE << "GetFInstanceState(): instance_id=" << PrintId(instance_id);
-  lock_guard<mutex> l(fis_map_lock_);
+  lock_guard<SpinLock> l(fis_map_lock_);
   auto it = fis_map_.find(instance_id);
   return it != fis_map_.end() ? it->second : nullptr;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb52b2b8/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 685e82a..c381dfe 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -21,12 +21,12 @@
 #include <memory>
 #include <unordered_map>
 #include <boost/scoped_ptr.hpp>
-#include <boost/thread/mutex.hpp>
 
 #include "common/atomic.h"
 #include "common/object-pool.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/Types_types.h"
+#include "util/spinlock.h"
 #include "util/uid-util.h"
 
 namespace impala {
@@ -95,6 +95,11 @@ class QueryState {
 
   MemTracker* query_mem_tracker() const { return query_mem_tracker_; }
 
+  /// Sets up state required for fragment execution: memory reservations, etc. 
Fails
+  /// if resources could not be acquired. Safe to call concurrently and 
idempotent:
+  /// the first thread to call this does the setup work.
+  Status Prepare();
+
   /// Registers a new FInstanceState.
   void RegisterFInstance(FragmentInstanceState* fis);
 
@@ -118,10 +123,21 @@ class QueryState {
   ObjectPool obj_pool_;
   AtomicInt32 refcnt_;
 
+  /// Held for duration of Prepare(). Protects 'prepared_',
+  /// 'prepare_status_' and the members initialized in Prepare().
+  SpinLock prepare_lock_;
+
+  /// Non-OK if Prepare() failed the first time it was called.
+  /// All subsequent calls to Prepare() return this status.
+  Status prepare_status_;
+
+  /// True if Prepare() executed and finished successfully.
+  bool prepared_;
+
   /// True if and only if ReleaseResources() has been called.
   bool released_resources_;
 
-  boost::mutex fis_map_lock_; // protects fis_map_
+  SpinLock fis_map_lock_; // protects fis_map_
 
   /// map from instance id to its state (owned by obj_pool_)
   std::unordered_map<TUniqueId, FragmentInstanceState*> fis_map_;
@@ -133,7 +149,7 @@ class QueryState {
   /// The query is associated with the resource pool named 'pool'
   QueryState(const TQueryCtx& query_ctx, const std::string& pool);
 
-  /// Called from constructor to initialize MemTrackers.
+  /// Called from Prepare() to initialize MemTrackers.
   void InitMemTrackers(const std::string& pool);
 };
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb52b2b8/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 4f9e8f5..1c28acd 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -103,6 +103,7 @@ Status TestEnv::CreateQueryState(int64_t query_id, int 
max_buffers, int block_si
   // CreateQueryState() enforces the invariant that 'query_id' must be unique.
   QueryState* qs = exec_env_->query_exec_mgr()->CreateQueryState(query_ctx, 
"test-pool");
   query_states_.push_back(qs);
+  RETURN_IF_ERROR(qs->Prepare());
   FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState(
       qs, TPlanFragmentCtx(), TPlanFragmentInstanceCtx(), TDescriptorTable()));
   RuntimeState* rs = qs->obj_pool()->Add(

Reply via email to