IMPALA-3748: add query-wide resource acquisition step This adds a Prepare() method to QueryState that allows query execution to fail up-front if resources cannot be acquired. We don't make full use of this yet, but it provides a suitable place to acquire a memory reservation for the query or fail cleanly.
The process memory limit check is moved here so that all startup memory checks will be eventually consolidated in one place. Also switch a boost::mutex to a SpinLock for consistency and to improve performance (see lock-benchmark.cc). Change-Id: Ia21a3c0f0b0a7175116883ef9871b93c8ce8bb81 Reviewed-on: http://gerrit.cloudera.org:8080/5739 Tested-by: Impala Public Jenkins Reviewed-by: Tim Armstrong <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cb52b2b8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cb52b2b8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cb52b2b8 Branch: refs/heads/master Commit: cb52b2b8aebfe24e09743fca3c32936fa85105d2 Parents: 4b486b0 Author: Tim Armstrong <[email protected]> Authored: Mon Jan 16 13:44:46 2017 -0800 Committer: Tim Armstrong <[email protected]> Committed: Tue Jan 24 18:49:24 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/query-exec-mgr.cc | 17 ++++++----------- be/src/runtime/query-exec-mgr.h | 4 ++-- be/src/runtime/query-state.cc | 33 ++++++++++++++++++++++++++++++--- be/src/runtime/query-state.h | 22 +++++++++++++++++++--- be/src/runtime/test-env.cc | 1 + 5 files changed, 58 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb52b2b8/be/src/runtime/query-exec-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc index 5990a24..c38f478 100644 --- a/be/src/runtime/query-exec-mgr.cc +++ b/be/src/runtime/query-exec-mgr.cc @@ -45,22 +45,17 @@ Status QueryExecMgr::StartFInstance(const TExecPlanFragmentParams& params) { VLOG_QUERY << "StartFInstance() instance_id=" << PrintId(instance_id) << " coord=" << params.query_ctx.coord_address; - // Starting a new fragment instance creates a thread and consumes a non-trivial - // amount of memory. If we are already starved for memory, cancel the instance as - // early as possible to avoid digging the hole deeper. - MemTracker* process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker(); - if (process_mem_tracker->LimitExceeded()) { - string msg = Substitute("Instance $0 of plan fragment $1 could not " - "start because the backend Impala daemon is over its memory limit", - PrintId(instance_id), params.fragment_ctx.fragment.display_name); - return process_mem_tracker->MemLimitExceeded(NULL, msg, 0); - } - bool dummy; QueryState* qs = GetOrCreateQueryState( params.query_ctx, params.fragment_instance_ctx.request_pool, &dummy); DCHECK(params.__isset.fragment_ctx); DCHECK(params.__isset.fragment_instance_ctx); + Status status = qs->Prepare(); + if (!status.ok()) { + ReleaseQueryState(qs); + return status; + } + FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState( qs, params.fragment_ctx, params.fragment_instance_ctx, params.query_ctx.desc_tbl)); // register instance before returning so that async Cancel() calls can http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb52b2b8/be/src/runtime/query-exec-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h index 37abb85..92e756f 100644 --- a/be/src/runtime/query-exec-mgr.h +++ b/be/src/runtime/query-exec-mgr.h @@ -56,8 +56,8 @@ class QueryExecMgr { /// return value of this function. Status StartFInstance(const TExecPlanFragmentParams& params); - /// Creates and the QueryState for the given query with the provided parameters. Only - /// valid to call if the QueryState does not already exist. The caller must call + /// Creates a QueryState for the given query with the provided parameters. Only valid + /// to call if the QueryState does not already exist. The caller must call /// ReleaseQueryState() with the returned QueryState to decrement the refcount. QueryState* CreateQueryState( const TQueryCtx& query_ctx, const std::string& request_pool); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb52b2b8/be/src/runtime/query-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index 7cc3396..7757936 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -39,7 +39,7 @@ QueryState::ScopedRef::~ScopedRef() { } QueryState::QueryState(const TQueryCtx& query_ctx, const std::string& pool) - : query_ctx_(query_ctx), refcnt_(0), released_resources_(false) { + : query_ctx_(query_ctx), refcnt_(0), prepared_(false), released_resources_(false) { TQueryOptions& query_options = query_ctx_.client_request.query_options; // max_errors does not indicate how many errors in total have been recorded, but rather // how many are distinct. It is defined as the sum of the number of generic errors and @@ -63,6 +63,33 @@ QueryState::~QueryState() { DCHECK(released_resources_); } +Status QueryState::Prepare() { + lock_guard<SpinLock> l(prepare_lock_); + if (prepared_) { + DCHECK(prepare_status_.ok()); + return Status::OK(); + } + RETURN_IF_ERROR(prepare_status_); + + Status status; + // Starting a new query creates threads and consumes a non-trivial amount of memory. + // If we are already starved for memory, fail as early as possible to avoid consuming + // more resources. + MemTracker* process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker(); + if (process_mem_tracker->LimitExceeded()) { + string msg = Substitute("Query $0 could not start because the backend Impala daemon " + "is over its memory limit", + PrintId(query_id())); + prepare_status_ = process_mem_tracker->MemLimitExceeded(NULL, msg, 0); + return prepare_status_; + } + + // TODO: IMPALA-3748: acquire minimum buffer reservation at this point. + + prepared_ = true; + return Status::OK(); +} + void QueryState::InitMemTrackers(const std::string& pool) { int64_t bytes_limit = -1; if (query_options().__isset.mem_limit && query_options().mem_limit > 0) { @@ -76,14 +103,14 @@ void QueryState::InitMemTrackers(const std::string& pool) { void QueryState::RegisterFInstance(FragmentInstanceState* fis) { VLOG_QUERY << "RegisterFInstance(): instance_id=" << PrintId(fis->instance_id()); - lock_guard<mutex> l(fis_map_lock_); + lock_guard<SpinLock> l(fis_map_lock_); DCHECK_EQ(fis_map_.count(fis->instance_id()), 0); fis_map_.insert(make_pair(fis->instance_id(), fis)); } FragmentInstanceState* QueryState::GetFInstanceState(const TUniqueId& instance_id) { VLOG_FILE << "GetFInstanceState(): instance_id=" << PrintId(instance_id); - lock_guard<mutex> l(fis_map_lock_); + lock_guard<SpinLock> l(fis_map_lock_); auto it = fis_map_.find(instance_id); return it != fis_map_.end() ? it->second : nullptr; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb52b2b8/be/src/runtime/query-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h index 685e82a..c381dfe 100644 --- a/be/src/runtime/query-state.h +++ b/be/src/runtime/query-state.h @@ -21,12 +21,12 @@ #include <memory> #include <unordered_map> #include <boost/scoped_ptr.hpp> -#include <boost/thread/mutex.hpp> #include "common/atomic.h" #include "common/object-pool.h" #include "gen-cpp/ImpalaInternalService_types.h" #include "gen-cpp/Types_types.h" +#include "util/spinlock.h" #include "util/uid-util.h" namespace impala { @@ -95,6 +95,11 @@ class QueryState { MemTracker* query_mem_tracker() const { return query_mem_tracker_; } + /// Sets up state required for fragment execution: memory reservations, etc. Fails + /// if resources could not be acquired. Safe to call concurrently and idempotent: + /// the first thread to call this does the setup work. + Status Prepare(); + /// Registers a new FInstanceState. void RegisterFInstance(FragmentInstanceState* fis); @@ -118,10 +123,21 @@ class QueryState { ObjectPool obj_pool_; AtomicInt32 refcnt_; + /// Held for duration of Prepare(). Protects 'prepared_', + /// 'prepare_status_' and the members initialized in Prepare(). + SpinLock prepare_lock_; + + /// Non-OK if Prepare() failed the first time it was called. + /// All subsequent calls to Prepare() return this status. + Status prepare_status_; + + /// True if Prepare() executed and finished successfully. + bool prepared_; + /// True if and only if ReleaseResources() has been called. bool released_resources_; - boost::mutex fis_map_lock_; // protects fis_map_ + SpinLock fis_map_lock_; // protects fis_map_ /// map from instance id to its state (owned by obj_pool_) std::unordered_map<TUniqueId, FragmentInstanceState*> fis_map_; @@ -133,7 +149,7 @@ class QueryState { /// The query is associated with the resource pool named 'pool' QueryState(const TQueryCtx& query_ctx, const std::string& pool); - /// Called from constructor to initialize MemTrackers. + /// Called from Prepare() to initialize MemTrackers. void InitMemTrackers(const std::string& pool); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb52b2b8/be/src/runtime/test-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc index 4f9e8f5..1c28acd 100644 --- a/be/src/runtime/test-env.cc +++ b/be/src/runtime/test-env.cc @@ -103,6 +103,7 @@ Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_si // CreateQueryState() enforces the invariant that 'query_id' must be unique. QueryState* qs = exec_env_->query_exec_mgr()->CreateQueryState(query_ctx, "test-pool"); query_states_.push_back(qs); + RETURN_IF_ERROR(qs->Prepare()); FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState( qs, TPlanFragmentCtx(), TPlanFragmentInstanceCtx(), TDescriptorTable())); RuntimeState* rs = qs->obj_pool()->Add(
