http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/query-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h new file mode 100644 index 0000000..fe9cca2 --- /dev/null +++ b/be/src/runtime/query-state.h @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#ifndef IMPALA_RUNTIME_QUERY_STATE_H +#define IMPALA_RUNTIME_QUERY_STATE_H + +#include <boost/thread/mutex.hpp> +#include <unordered_map> + +#include "common/object-pool.h" +#include "gen-cpp/ImpalaInternalService_types.h" +#include "gen-cpp/Types_types.h" +#include "util/uid-util.h" +#include "common/atomic.h" + +namespace impala { + +class FragmentInstanceState; + +/// Central class for all backend execution state (example: the FragmentInstanceStates +/// of the individual fragment instances) created for a particular query. +/// This class contains or makes accessible state that is shared across fragment +/// instances; in contrast, fragment instance-specific state is collected in +/// FragmentInstanceState. +/// +/// The lifetime of an instance of this class is dictated by a reference count. +/// Any thread that executes on behalf of a query, and accesses any of its state, +/// must obtain a reference to the corresponding QueryState and hold it for at least the +/// duration of that access. The reference is obtained and released via +/// QueryExecMgr::Get-/ReleaseQueryState() or via QueryState::ScopedRef (the latter +/// for references limited to the scope of a single function or block). +/// As long as the reference count is greater than 0, all query state (contained +/// either in this class or accessible through this class, such as the +/// FragmentInstanceStates) is guaranteed to be alive. +/// +/// Thread-safe, unless noted otherwise. +class QueryState { + public: + /// Use this class to obtain a QueryState for the duration of a function/block, + /// rather than manually via QueryExecMgr::Get-/ReleaseQueryState(). + /// Pattern: + /// { + /// QueryState::ScopedRef qs(qid); + /// if (qs->query_state() == nullptr) <do something, such as return> + /// ... + /// } + class ScopedRef { + public: + ScopedRef(const TUniqueId& query_id); + ~ScopedRef(); + + /// may return nullptr + QueryState* get() const { return query_state_; } + QueryState* operator->() const { return query_state_; } + + private: + QueryState* query_state_; + DISALLOW_COPY_AND_ASSIGN(ScopedRef); + }; + + /// a shared pool for all objects that have query lifetime + ObjectPool* obj_pool() { return &obj_pool_; } + + /// This TQueryCtx was copied from the first fragment instance which led to the + /// creation of this QueryState. For all subsequently arriving fragment instances the + /// desc_tbl in this context will be incorrect, therefore query_ctx().desc_tbl should + /// not be used. This restriction will go away with the switch to a per-query exec + /// rpc. + const TQueryCtx& query_ctx() const { return query_ctx_; } + + const TUniqueId& query_id() const { return query_ctx_.query_id; } + + /// Registers a new FInstanceState. + void RegisterFInstance(FragmentInstanceState* fis); + + /// Returns the instance state or nullptr if the instance id has not previously + /// been registered. The returned FIS is valid for the duration of the QueryState. + FragmentInstanceState* GetFInstanceState(const TUniqueId& instance_id); + + private: + friend class QueryExecMgr; + friend class TestEnv; + + static const int DEFAULT_BATCH_SIZE = 1024; + + TQueryCtx query_ctx_; + + ObjectPool obj_pool_; + AtomicInt32 refcnt_; + + boost::mutex fis_map_lock_; // protects fis_map_ + + /// map from instance id to its state (owned by obj_pool_) + std::unordered_map<TUniqueId, FragmentInstanceState*> fis_map_; + + /// Create QueryState w/ copy of query_ctx and refcnt of 0. + QueryState(const TQueryCtx& query_ctx); +}; + +} + +#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/runtime-filter-bank.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc index 53755f9..759b505 100644 --- a/be/src/runtime/runtime-filter-bank.cc +++ b/be/src/runtime/runtime-filter-bank.cc @@ -45,12 +45,12 @@ RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* s state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES); // Clamp bloom filter size down to the limits {MIN,MAX}_BLOOM_FILTER_SIZE - max_filter_size_ = query_ctx.request.query_options.runtime_filter_max_size; + max_filter_size_ = query_ctx.client_request.query_options.runtime_filter_max_size; max_filter_size_ = max<int64_t>(max_filter_size_, MIN_BLOOM_FILTER_SIZE); max_filter_size_ = BitUtil::RoundUpToPowerOfTwo(min<int64_t>(max_filter_size_, MAX_BLOOM_FILTER_SIZE)); - min_filter_size_ = query_ctx.request.query_options.runtime_filter_min_size; + min_filter_size_ = query_ctx.client_request.query_options.runtime_filter_min_size; min_filter_size_ = max<int64_t>(min_filter_size_, MIN_BLOOM_FILTER_SIZE); min_filter_size_ = BitUtil::RoundUpToPowerOfTwo(min<int64_t>(min_filter_size_, MAX_BLOOM_FILTER_SIZE)); @@ -61,7 +61,7 @@ RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* s DCHECK_GT(min_filter_size_, 0); DCHECK_GT(max_filter_size_, 0); - default_filter_size_ = query_ctx.request.query_options.runtime_bloom_filter_size; + default_filter_size_ = query_ctx.client_request.query_options.runtime_bloom_filter_size; default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_); default_filter_size_ = BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/runtime-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index a0c3335..5be4f1c 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -32,12 +32,14 @@ #include "exprs/expr.h" #include "exprs/scalar-fn-call.h" #include "runtime/buffered-block-mgr.h" +#include "runtime/exec-env.h" #include "runtime/descriptors.h" #include "runtime/data-stream-mgr.h" #include "runtime/data-stream-recvr.h" #include "runtime/mem-tracker.h" #include "runtime/runtime-filter-bank.h" #include "runtime/timestamp-value.h" +#include "runtime/query-state.h" #include "util/bitmap.h" #include "util/cpu-info.h" #include "util/debug-util.h" @@ -46,6 +48,7 @@ #include "util/jni-util.h" #include "util/mem-info.h" #include "util/pretty-printer.h" +#include "util/auth-util.h" // for GetEffectiveUser() #include "common/names.h" @@ -69,28 +72,35 @@ static const int64_t BLOCK_MGR_MEM_MIN_REMAINING = 100 * 1024 * 1024; namespace impala { RuntimeState::RuntimeState( - const TExecPlanFragmentParams& fragment_params, ExecEnv* exec_env) - : obj_pool_(new ObjectPool()), - fragment_params_(fragment_params), + QueryState* query_state, const TPlanFragmentCtx& fragment_ctx, + const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env) + : desc_tbl_(nullptr), + obj_pool_(new ObjectPool()), + query_state_(query_state), + fragment_ctx_(&fragment_ctx), + instance_ctx_(&instance_ctx), now_(new TimestampValue( - query_ctx().now_string.c_str(), query_ctx().now_string.size())), - profile_(obj_pool_.get(), "Fragment " + PrintId(fragment_ctx().fragment_instance_id)), + query_state->query_ctx().now_string.c_str(), + query_state->query_ctx().now_string.size())), + exec_env_(exec_env), + profile_(obj_pool_.get(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)), is_cancelled_(false), root_node_id_(-1) { - Status status = Init(exec_env); - DCHECK(status.ok()) << status.GetDetail(); + Init(); } -RuntimeState::RuntimeState(const TQueryCtx& query_ctx) +RuntimeState::RuntimeState(const TQueryCtx& query_ctx, ExecEnv* exec_env) : obj_pool_(new ObjectPool()), - now_(new TimestampValue(query_ctx.now_string.c_str(), - query_ctx.now_string.size())), - exec_env_(ExecEnv::GetInstance()), + query_state_(nullptr), + fragment_ctx_(nullptr), + instance_ctx_(nullptr), + local_query_ctx_(query_ctx), + now_(new TimestampValue(query_ctx.now_string.c_str(), query_ctx.now_string.size())), + exec_env_(exec_env == nullptr ? ExecEnv::GetInstance() : exec_env), profile_(obj_pool_.get(), "<unnamed>"), is_cancelled_(false), root_node_id_(-1) { - fragment_params_.__set_query_ctx(query_ctx); - fragment_params_.query_ctx.request.query_options.__set_batch_size(DEFAULT_BATCH_SIZE); + Init(); } RuntimeState::~RuntimeState() { @@ -110,27 +120,12 @@ RuntimeState::~RuntimeState() { query_mem_tracker_.reset(); } -Status RuntimeState::Init(ExecEnv* exec_env) { +void RuntimeState::Init() { SCOPED_TIMER(profile_.total_time_counter()); - exec_env_ = exec_env; - TQueryOptions& query_options = - fragment_params_.query_ctx.request.query_options; - - // max_errors does not indicate how many errors in total have been recorded, but rather - // how many are distinct. It is defined as the sum of the number of generic errors and - // the number of distinct other errors. - if (query_options.max_errors <= 0) { - // TODO: fix linker error and uncomment this - //query_options_.max_errors = FLAGS_max_errors; - query_options.max_errors = 100; - } - if (query_options.batch_size <= 0) { - query_options.__set_batch_size(DEFAULT_BATCH_SIZE); - } // Register with the thread mgr - if (exec_env != NULL) { - resource_pool_ = exec_env->thread_mgr()->RegisterPool(); + if (exec_env_ != NULL) { + resource_pool_ = exec_env_->thread_mgr()->RegisterPool(); DCHECK(resource_pool_ != NULL); } @@ -138,19 +133,16 @@ Status RuntimeState::Init(ExecEnv* exec_env) { total_storage_wait_timer_ = ADD_TIMER(runtime_profile(), "TotalStorageWaitTime"); total_network_send_timer_ = ADD_TIMER(runtime_profile(), "TotalNetworkSendTime"); total_network_receive_timer_ = ADD_TIMER(runtime_profile(), "TotalNetworkReceiveTime"); - - return Status::OK(); } -void RuntimeState::InitMemTrackers( - const TUniqueId& query_id, const string* pool_name, int64_t query_bytes_limit) { +void RuntimeState::InitMemTrackers(const string* pool_name, int64_t query_bytes_limit) { MemTracker* query_parent_tracker = exec_env_->process_mem_tracker(); if (pool_name != NULL) { query_parent_tracker = MemTracker::GetRequestPoolMemTracker(*pool_name, query_parent_tracker); } query_mem_tracker_ = - MemTracker::GetQueryMemTracker(query_id, query_bytes_limit, query_parent_tracker); + MemTracker::GetQueryMemTracker(query_id(), query_bytes_limit, query_parent_tracker); instance_mem_tracker_.reset(new MemTracker( runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker_.get())); } @@ -314,4 +306,39 @@ void RuntimeState::ReleaseResources() { exec_env_->thread_mgr()->UnregisterPool(resource_pool_); } } + +const std::string& RuntimeState::GetEffectiveUser() const { + return impala::GetEffectiveUser(query_ctx().session); +} + +ImpalaBackendClientCache* RuntimeState::impalad_client_cache() { + return exec_env_->impalad_client_cache(); +} + +CatalogServiceClientCache* RuntimeState::catalogd_client_cache() { + return exec_env_->catalogd_client_cache(); +} + +DiskIoMgr* RuntimeState::io_mgr() { + return exec_env_->disk_io_mgr(); +} + +DataStreamMgr* RuntimeState::stream_mgr() { + return exec_env_->stream_mgr(); +} + +HBaseTableFactory* RuntimeState::htable_factory() { + return exec_env_->htable_factory(); +} + +const TQueryCtx& RuntimeState::query_ctx() const { + return query_state_ != nullptr ? query_state_->query_ctx() : local_query_ctx_; +} + +const TQueryOptions& RuntimeState::query_options() const { + const TQueryCtx& query_ctx = + query_state_ != nullptr ? query_state_->query_ctx() : local_query_ctx_; + return query_ctx.client_request.query_options; +} + } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/runtime-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index de3ab94..b56097a 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -26,16 +26,16 @@ // NOTE: try not to add more headers here: runtime-state.h is included in many many files. #include "common/global-types.h" // for PlanNodeId #include "runtime/client-cache-types.h" -#include "runtime/exec-env.h" #include "runtime/thread-resource-mgr.h" -#include "util/auth-util.h" // for GetEffectiveUser() #include "util/runtime-profile.h" +#include "gen-cpp/ImpalaInternalService_types.h" namespace impala { class BufferedBlockMgr; class DataStreamRecvr; class DescriptorTbl; +class DiskIoMgr; class DiskIoRequestContext; class Expr; class LlvmCodeGen; @@ -45,8 +45,15 @@ class RuntimeFilterBank; class ScalarFnCall; class Status; class TimestampValue; -class TQueryOptions; class TUniqueId; +class ExecEnv; +class DataStreamMgr; +class HBaseTableFactory; +class TPlanFragmentCtx; +class TPlanFragmentInstanceCtx; +class QueryState; + +/// TODO: move the typedefs into a separate .h (and fix the includes for that) /// Counts how many rows an INSERT query has added to a particular partition /// (partitions are identified by their partition keys: k1=v1/k2=v2 @@ -68,10 +75,13 @@ typedef std::map<std::string, std::string> FileMoveMap; /// destruction. class RuntimeState { public: - RuntimeState(const TExecPlanFragmentParams& fragment_params, ExecEnv* exec_env); + /// query_state, fragment_ctx, and instance_ctx need to be alive at least as long as + /// the constructed RuntimeState + RuntimeState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx, + const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env); /// RuntimeState for executing expr in fe-support. - RuntimeState(const TQueryCtx& query_ctx); + RuntimeState(const TQueryCtx& query_ctx, ExecEnv* exec_env = nullptr); /// Empty d'tor to avoid issues with scoped_ptr. ~RuntimeState(); @@ -82,8 +92,7 @@ class RuntimeState { /// when they are initialized. This function also initializes a user function mem /// tracker (in the fifth level). If 'request_pool' is null, no request pool mem /// tracker is set up, i.e. query pools will have the process mem pool as the parent. - void InitMemTrackers(const TUniqueId& query_id, const std::string* request_pool, - int64_t query_bytes_limit); + void InitMemTrackers(const std::string* request_pool, int64_t query_bytes_limit); /// Initializes the runtime filter bank. Must be called after InitMemTrackers(). void InitFilterBank(); @@ -91,30 +100,19 @@ class RuntimeState { /// Gets/Creates the query wide block mgr. Status CreateBlockMgr(); + QueryState* query_state() const { return query_state_; } ObjectPool* obj_pool() const { return obj_pool_.get(); } const DescriptorTbl& desc_tbl() const { return *desc_tbl_; } void set_desc_tbl(DescriptorTbl* desc_tbl) { desc_tbl_ = desc_tbl; } - const TQueryOptions& query_options() const { - return query_ctx().request.query_options; - } - int batch_size() const { return query_ctx().request.query_options.batch_size; } - bool abort_on_error() const { - return query_ctx().request.query_options.abort_on_error; - } - bool strict_mode() const { - return query_ctx().request.query_options.strict_mode; - } + const TQueryOptions& query_options() const; + int batch_size() const { return query_options().batch_size; } + bool abort_on_error() const { return query_options().abort_on_error; } + bool strict_mode() const { return query_options().strict_mode; } bool abort_on_default_limit_exceeded() const { - return query_ctx().request.query_options.abort_on_default_limit_exceeded; - } - const TQueryCtx& query_ctx() const { return fragment_params_.query_ctx; } - const TPlanFragmentInstanceCtx& fragment_ctx() const { - return fragment_params_.fragment_instance_ctx; - } - const TExecPlanFragmentParams& fragment_params() const { return fragment_params_; } - const std::string& effective_user() const { - return GetEffectiveUser(query_ctx().session); + return query_options().abort_on_default_limit_exceeded; } + const TQueryCtx& query_ctx() const; + const TPlanFragmentInstanceCtx& instance_ctx() const { return *instance_ctx_; } const TUniqueId& session_id() const { return query_ctx().session.session_id; } const std::string& do_as_user() const { return query_ctx().session.delegated_user; } const std::string& connected_user() const { @@ -124,18 +122,16 @@ class RuntimeState { void set_now(const TimestampValue* now); const TUniqueId& query_id() const { return query_ctx().query_id; } const TUniqueId& fragment_instance_id() const { - return fragment_ctx().fragment_instance_id; + return instance_ctx_ != nullptr + ? instance_ctx_->fragment_instance_id + : no_instance_id_; } ExecEnv* exec_env() { return exec_env_; } - DataStreamMgr* stream_mgr() { return exec_env_->stream_mgr(); } - HBaseTableFactory* htable_factory() { return exec_env_->htable_factory(); } - ImpalaBackendClientCache* impalad_client_cache() { - return exec_env_->impalad_client_cache(); - } - CatalogServiceClientCache* catalogd_client_cache() { - return exec_env_->catalogd_client_cache(); - } - DiskIoMgr* io_mgr() { return exec_env_->disk_io_mgr(); } + DataStreamMgr* stream_mgr(); + HBaseTableFactory* htable_factory(); + ImpalaBackendClientCache* impalad_client_cache(); + CatalogServiceClientCache* catalogd_client_cache(); + DiskIoMgr* io_mgr(); MemTracker* instance_mem_tracker() { return instance_mem_tracker_.get(); } MemTracker* query_mem_tracker() { return query_mem_tracker_.get(); } ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; } @@ -161,6 +157,8 @@ class RuntimeState { /// Returns the LlvmCodeGen object for this fragment instance. LlvmCodeGen* codegen() { return codegen_.get(); } + const std::string& GetEffectiveUser() const; + /// Add ScalarFnCall expression 'udf' to be codegen'd later if it's not disabled by /// query option. This is for cases in which the UDF cannot be interpreted or if the /// plan fragment doesn't contain any codegen enabled operator. @@ -321,15 +319,13 @@ class RuntimeState { friend class TestEnv; /// Set per-fragment state. - Status Init(ExecEnv* exec_env); + void Init(); /// Use a custom block manager for the query for testing purposes. void set_block_mgr(const std::shared_ptr<BufferedBlockMgr>& block_mgr) { block_mgr_ = block_mgr; } - static const int DEFAULT_BATCH_SIZE = 1024; - DescriptorTbl* desc_tbl_ = nullptr; boost::scoped_ptr<ObjectPool> obj_pool_; @@ -339,15 +335,23 @@ class RuntimeState { /// Logs error messages. ErrorLogMap error_log_; - /// Original thrift descriptor for this fragment. Includes its unique id, the total - /// number of fragment instances, the query context, the coordinator address, the - /// descriptor table, etc. - TExecPlanFragmentParams fragment_params_; + /// Global QueryState and original thrift descriptors for this fragment instance. + /// Not set by the (const TQueryCtx&) c'tor. + QueryState* const query_state_; + const TPlanFragmentCtx* const fragment_ctx_; + const TPlanFragmentInstanceCtx* const instance_ctx_; + + /// Provides query ctx if query_state_ == nullptr. + TQueryCtx local_query_ctx_; + + /// Provides instance id if instance_ctx_ == nullptr + TUniqueId no_instance_id_; /// Query-global timestamp, e.g., for implementing now(). Set from query_globals_. /// Use pointer to avoid inclusion of timestampvalue.h and avoid clang issues. boost::scoped_ptr<TimestampValue> now_; + /// TODO: get rid of this and use ExecEnv::GetInstance() instead ExecEnv* exec_env_; boost::scoped_ptr<LlvmCodeGen> codegen_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/test-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc index 5f98d13..8675ae9 100644 --- a/be/src/runtime/test-env.cc +++ b/be/src/runtime/test-env.cc @@ -65,37 +65,11 @@ TestEnv::~TestEnv() { metrics_.reset(); } -Status TestEnv::CreatePerQueryState(int64_t query_id, int max_buffers, int block_size, - RuntimeState** runtime_state, TQueryOptions* query_options) { - // Enforce invariant that each query ID can be registered at most once. - if (runtime_states_.find(query_id) != runtime_states_.end()) { - return Status(Substitute("Duplicate query id found: $0", query_id)); - } - - TExecPlanFragmentParams plan_params = TExecPlanFragmentParams(); - if (query_options != NULL) plan_params.query_ctx.request.query_options = *query_options; - plan_params.query_ctx.query_id.hi = 0; - plan_params.query_ctx.query_id.lo = query_id; - - *runtime_state = new RuntimeState(plan_params, exec_env_.get()); - (*runtime_state)->InitMemTrackers(TUniqueId(), NULL, -1); - - shared_ptr<BufferedBlockMgr> mgr; - RETURN_IF_ERROR(BufferedBlockMgr::Create(*runtime_state, - (*runtime_state)->query_mem_tracker(), (*runtime_state)->runtime_profile(), - tmp_file_mgr_.get(), CalculateMemLimit(max_buffers, block_size), block_size, &mgr)); - (*runtime_state)->set_block_mgr(mgr); - - runtime_states_[query_id] = shared_ptr<RuntimeState>(*runtime_state); - return Status::OK(); -} - void TestEnv::TearDownRuntimeStates() { for (auto& runtime_state : runtime_states_) runtime_state.second->ReleaseResources(); runtime_states_.clear(); } - int64_t TestEnv::CalculateMemLimit(int max_buffers, int block_size) { DCHECK_GE(max_buffers, -1); if (max_buffers == -1) return -1; @@ -109,4 +83,29 @@ int64_t TestEnv::TotalQueryMemoryConsumption() { } return total; } + +Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_size, + const TQueryOptions* query_options, RuntimeState** runtime_state) { + // Enforce invariant that each query ID can be registered at most once. + if (runtime_states_.find(query_id) != runtime_states_.end()) { + return Status(Substitute("Duplicate query id found: $0", query_id)); + } + + TQueryCtx query_ctx; + if (query_options != nullptr) query_ctx.client_request.query_options = *query_options; + query_ctx.query_id.hi = 0; + query_ctx.query_id.lo = query_id; + *runtime_state = new RuntimeState(query_ctx, exec_env_.get()); + (*runtime_state)->InitMemTrackers(nullptr, -1); + + shared_ptr<BufferedBlockMgr> mgr; + RETURN_IF_ERROR(BufferedBlockMgr::Create(*runtime_state, + (*runtime_state)->query_mem_tracker(), (*runtime_state)->runtime_profile(), + tmp_file_mgr_.get(), CalculateMemLimit(max_buffers, block_size), block_size, &mgr)); + (*runtime_state)->set_block_mgr(mgr); + + runtime_states_[query_id] = shared_ptr<RuntimeState>(*runtime_state); + return Status::OK(); +} + } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/test-env.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h index 6399110..e648977 100644 --- a/be/src/runtime/test-env.h +++ b/be/src/runtime/test-env.h @@ -23,6 +23,7 @@ #include "runtime/exec-env.h" #include "runtime/mem-tracker.h" #include "runtime/runtime-state.h" +#include "runtime/query-state.h" namespace impala { @@ -39,9 +40,9 @@ class TestEnv { /// Create a RuntimeState for a query with a new block manager and the given query /// options. The RuntimeState is owned by the TestEnv. Returns an error if - /// CreatePerQueryState() has been called with the same query ID already. - Status CreatePerQueryState(int64_t query_id, int max_buffers, int block_size, - RuntimeState** runtime_state, TQueryOptions* query_options = NULL); + /// CreateQueryState() has been called with the same query ID already. + Status CreateQueryState(int64_t query_id, int max_buffers, int block_size, + const TQueryOptions* query_options, RuntimeState** runtime_state); /// Destroy all RuntimeStates and block managers created by this TestEnv. void TearDownRuntimeStates(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/thread-resource-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/thread-resource-mgr.h b/be/src/runtime/thread-resource-mgr.h index 612e733..8b86dcc 100644 --- a/be/src/runtime/thread-resource-mgr.h +++ b/be/src/runtime/thread-resource-mgr.h @@ -60,6 +60,8 @@ namespace impala { /// - Priorities for different pools /// If both the mgr and pool locks need to be taken, the mgr lock must /// be taken first. +/// +/// TODO: make ResourcePool a stand-alone class class ThreadResourceMgr { public: class ResourcePool; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/scheduling/query-schedule.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc index e2dc7c4..668c9c8 100644 --- a/be/src/scheduling/query-schedule.cc +++ b/be/src/scheduling/query-schedule.cc @@ -78,7 +78,7 @@ void QuerySchedule::Init() { if (request_.stmt_type == TStmtType::QUERY) { fragment_exec_params_[root_fragment.idx].is_coord_fragment = true; // the coordinator instance gets index 0, generated instance ids start at 1 - next_instance_id_.lo = 1; + next_instance_id_ = CreateInstanceId(next_instance_id_, 1); } // find max node id http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/scheduling/request-pool-service.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/request-pool-service.cc b/be/src/scheduling/request-pool-service.cc index 9bddde9..b674bf0 100644 --- a/be/src/scheduling/request-pool-service.cc +++ b/be/src/scheduling/request-pool-service.cc @@ -160,7 +160,7 @@ Status RequestPoolService::ResolveRequestPool(const TQueryCtx& ctx, user = DEFAULT_USER; } - const string& requested_pool = ctx.request.query_options.request_pool; + const string& requested_pool = ctx.client_request.query_options.request_pool; TResolveRequestPoolParams params; params.__set_user(user); params.__set_requested_pool(requested_pool); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/scheduling/simple-scheduler.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc index ff0337c..95414e4 100644 --- a/be/src/scheduling/simple-scheduler.cc +++ b/be/src/scheduling/simple-scheduler.cc @@ -480,7 +480,8 @@ void SimpleScheduler::CreateUnionInstances( void SimpleScheduler::CreateScanInstances( PlanNodeId leftmost_scan_id, FragmentExecParams* fragment_params, QuerySchedule* schedule) { - int max_num_instances = schedule->request().query_ctx.request.query_options.mt_dop; + int max_num_instances = + schedule->request().query_ctx.client_request.query_options.mt_dop; if (max_num_instances == 0) max_num_instances = 1; if (fragment_params->scan_range_assignment.empty()) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt index 35130ff..6d5a897 100644 --- a/be/src/service/CMakeLists.txt +++ b/be/src/service/CMakeLists.txt @@ -24,13 +24,12 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/service") add_library(Service frontend.cc fe-support.cc - fragment-exec-state.cc - fragment-mgr.cc hs2-util.cc impala-server.cc impala-http-handler.cc impala-hs2-server.cc impala-beeswax-server.cc + impala-internal-service.cc query-exec-state.cc query-options.cc query-result-set.cc http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/fe-support.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc index 9415a76..0cc9761 100644 --- a/be/src/service/fe-support.cc +++ b/be/src/service/fe-support.cc @@ -95,7 +95,7 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow( query_ctx.disable_codegen_hint = true; // Allow logging of at least one error, so we can detect and convert it into a // Java exception. - query_ctx.request.query_options.max_errors = 1; + query_ctx.client_request.query_options.max_errors = 1; RuntimeState state(query_ctx); // Make sure to close the runtime state no matter how this scope is exited. const auto close_runtime_state = @@ -106,11 +106,11 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow( // Exprs can allocate memory so we need to set up the mem trackers before // preparing/running the exprs. int64_t mem_limit = -1; - if (query_ctx.request.query_options.__isset.mem_limit - && query_ctx.request.query_options.mem_limit > 0) { - mem_limit = query_ctx.request.query_options.mem_limit; + if (query_ctx.client_request.query_options.__isset.mem_limit + && query_ctx.client_request.query_options.mem_limit > 0) { + mem_limit = query_ctx.client_request.query_options.mem_limit; } - state.InitMemTrackers(state.query_id(), NULL, mem_limit); + state.InitMemTrackers(NULL, mem_limit); // Prepare() the exprs. Always Close() the exprs even in case of errors. vector<ExprContext*> expr_ctxs; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/fragment-exec-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fragment-exec-state.cc b/be/src/service/fragment-exec-state.cc deleted file mode 100644 index 337be82..0000000 --- a/be/src/service/fragment-exec-state.cc +++ /dev/null @@ -1,145 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "service/fragment-exec-state.h" - -#include <sstream> - -#include "codegen/llvm-codegen.h" -#include "gen-cpp/ImpalaInternalService.h" -#include "rpc/thrift-util.h" -#include "gutil/strings/substitute.h" -#include "runtime/runtime-filter-bank.h" -#include "util/bloom-filter.h" -#include "runtime/backend-client.h" - -#include "common/names.h" - -using namespace apache::thrift; -using namespace strings; -using namespace impala; - -Status FragmentMgr::FragmentExecState::UpdateStatus(const Status& status) { - lock_guard<mutex> l(status_lock_); - if (!status.ok() && exec_status_.ok()) exec_status_ = status; - return exec_status_; -} - -Status FragmentMgr::FragmentExecState::Cancel() { - lock_guard<mutex> l(status_lock_); - RETURN_IF_ERROR(exec_status_); - executor_.Cancel(); - return Status::OK(); -} - -void FragmentMgr::FragmentExecState::Exec() { - Status status = executor_.Prepare(exec_params_); - prepare_promise_.Set(status); - if (status.ok()) { - if (executor_.Open().ok()) { - executor_.Exec(); - } - } - executor_.Close(); -} - -void FragmentMgr::FragmentExecState::ReportStatusCb( - const Status& status, RuntimeProfile* profile, bool done) { - DCHECK(status.ok() || done); // if !status.ok() => done - Status exec_status = UpdateStatus(status); - - Status coord_status; - ImpalaBackendConnection coord(client_cache_, coord_address(), &coord_status); - if (!coord_status.ok()) { - stringstream s; - s << "Couldn't get a client for " << coord_address() <<"\tReason: " - << coord_status.GetDetail(); - UpdateStatus(Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, s.str()))); - return; - } - - TReportExecStatusParams params; - params.protocol_version = ImpalaInternalServiceVersion::V1; - params.__set_query_id(query_ctx_.query_id); - params.__set_fragment_instance_id(fragment_instance_ctx_.fragment_instance_id); - exec_status.SetTStatus(¶ms); - params.__set_done(done); - - if (profile != NULL) { - profile->ToThrift(¶ms.profile); - params.__isset.profile = true; - } - - RuntimeState* runtime_state = executor_.runtime_state(); - // If executor_ did not successfully prepare, runtime state may not have been set. - if (runtime_state != NULL) { - // Only send updates to insert status if fragment is finished, the coordinator - // waits until query execution is done to use them anyhow. - if (done) { - TInsertExecStatus insert_status; - - if (runtime_state->hdfs_files_to_move()->size() > 0) { - insert_status.__set_files_to_move(*runtime_state->hdfs_files_to_move()); - } - if (runtime_state->per_partition_status()->size() > 0) { - insert_status.__set_per_partition_status(*runtime_state->per_partition_status()); - } - - params.__set_insert_exec_status(insert_status); - } - - // Send new errors to coordinator - runtime_state->GetUnreportedErrors(&(params.error_log)); - } - params.__isset.error_log = (params.error_log.size() > 0); - - TReportExecStatusResult res; - Status rpc_status; - bool retry_is_safe; - // Try to send the RPC 3 times before failing. - for (int i = 0; i < 3; ++i) { - rpc_status = coord.DoRpc(&ImpalaBackendClient::ReportExecStatus, params, &res, - &retry_is_safe); - if (rpc_status.ok()) { - rpc_status = Status(res.status); - break; - } - if (!retry_is_safe) break; - if (i < 2) SleepForMs(100); - } - if (!rpc_status.ok()) { - UpdateStatus(rpc_status); - executor_.Cancel(); - } -} - -void FragmentMgr::FragmentExecState::PublishFilter(int32_t filter_id, - const TBloomFilter& thrift_bloom_filter) { - // Defensively protect against blocking forever in case there's some problem with - // Prepare(). - static const int WAIT_MS = 30000; - bool timed_out = false; - // Wait until Prepare() is done, so we know that the filter bank is set up. - Status prepare_status = prepare_promise_.Get(WAIT_MS, &timed_out); - if (timed_out) { - LOG(ERROR) << "Unexpected timeout in PublishFilter()"; - return; - } - if (!prepare_status.ok()) return; - executor_.runtime_state()->filter_bank()->PublishGlobalFilter(filter_id, - thrift_bloom_filter); -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/fragment-exec-state.h ---------------------------------------------------------------------- diff --git a/be/src/service/fragment-exec-state.h b/be/src/service/fragment-exec-state.h deleted file mode 100644 index 1c64f2d..0000000 --- a/be/src/service/fragment-exec-state.h +++ /dev/null @@ -1,105 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_SERVICE_FRAGMENT_EXEC_STATE_H -#define IMPALA_SERVICE_FRAGMENT_EXEC_STATE_H - -#include <boost/bind.hpp> -#include <boost/thread/mutex.hpp> - -#include "common/status.h" -#include "runtime/client-cache.h" -#include "runtime/plan-fragment-executor.h" -#include "service/fragment-mgr.h" - -namespace impala { - -/// Execution state of a single plan fragment. -class FragmentMgr::FragmentExecState { - public: - FragmentExecState(const TExecPlanFragmentParams& params, ExecEnv* exec_env) - : query_ctx_(params.query_ctx), fragment_instance_ctx_(params.fragment_instance_ctx), - executor_(exec_env, boost::bind<void>( - boost::mem_fn(&FragmentMgr::FragmentExecState::ReportStatusCb), - this, _1, _2, _3)), - client_cache_(exec_env->impalad_client_cache()), exec_params_(params) { - } - - /// Calling the d'tor releases all memory and closes all data streams - /// held by executor_. - ~FragmentExecState() { } - - /// Returns current execution status, if there was an error. Otherwise cancels - /// the fragment and returns OK. - Status Cancel(); - - /// Main loop of plan fragment execution. Blocks until execution finishes. - void Exec(); - - const TUniqueId& query_id() const { return query_ctx_.query_id; } - - const TUniqueId& fragment_instance_id() const { - return fragment_instance_ctx_.fragment_instance_id; - } - - const TNetworkAddress& coord_address() const { return query_ctx_.coord_address; } - - /// Set the execution thread, taking ownership of the object. - void set_exec_thread(Thread* exec_thread) { exec_thread_.reset(exec_thread); } - - /// Publishes filter with ID 'filter_id' to this fragment's filter bank. - void PublishFilter(int32_t filter_id, const TBloomFilter& thrift_bloom_filter); - - PlanFragmentExecutor* executor() { return &executor_; } - - private: - TQueryCtx query_ctx_; - TPlanFragmentInstanceCtx fragment_instance_ctx_; - PlanFragmentExecutor executor_; - ImpalaBackendClientCache* client_cache_; - TExecPlanFragmentParams exec_params_; - - /// the thread executing this plan fragment - boost::scoped_ptr<Thread> exec_thread_; - - /// protects exec_status_ - boost::mutex status_lock_; - - /// set in ReportStatusCb(); - /// if set to anything other than OK, execution has terminated w/ an error - Status exec_status_; - - /// Barrier for the completion of executor_.Prepare(). - Promise<Status> prepare_promise_; - - /// Update 'exec_status_' w/ 'status', if the former is not already an error. - /// Returns the value of 'exec_status_' after this method completes. - Status UpdateStatus(const Status& status); - - /// Callback for executor; updates exec_status_ if 'status' indicates an error - /// or if there was a thrift error. - /// If not NULL, `profile` is encoded as a Thrift structure and transmitted as part of - /// the reporting RPC. `profile` may be NULL if a runtime profile has not been created - /// for this fragment (e.g. when the fragment has failed during preparation). - /// The executor must ensure that there is only one invocation at a time. - void ReportStatusCb(const Status& status, RuntimeProfile* profile, bool done); - -}; - -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/fragment-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fragment-mgr.cc b/be/src/service/fragment-mgr.cc deleted file mode 100644 index 8e8fc05..0000000 --- a/be/src/service/fragment-mgr.cc +++ /dev/null @@ -1,154 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "service/fragment-mgr.h" - -#include <boost/lexical_cast.hpp> -#include <gperftools/malloc_extension.h> -#include <gutil/strings/substitute.h> - -#include "service/fragment-exec-state.h" -#include "runtime/exec-env.h" -#include "runtime/mem-tracker.h" -#include "util/impalad-metrics.h" -#include "util/uid-util.h" - -#include "common/names.h" - -using namespace impala; -using namespace strings; - -// TODO: this logging should go into a per query log. -DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory usage " - "every log_mem_usage_interval'th fragment completion."); - -Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& exec_params) { - VLOG_QUERY << "ExecPlanFragment() instance_id=" - << PrintId(exec_params.fragment_instance_ctx.fragment_instance_id) - << " coord=" << exec_params.query_ctx.coord_address; - - // Preparing and opening the fragment creates a thread and consumes a non-trivial - // amount of memory. If we are already starved for memory, cancel the fragment as - // early as possible to avoid digging the hole deeper. - MemTracker* process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker(); - if (process_mem_tracker->LimitExceeded()) { - string msg = Substitute("Instance $0 of plan fragment $1 of query $2 could not " - "start because the backend Impala daemon is over its memory limit", - PrintId(exec_params.fragment_instance_ctx.fragment_instance_id), - exec_params.fragment_ctx.fragment.display_name, - PrintId(exec_params.query_ctx.query_id)); - return process_mem_tracker->MemLimitExceeded(NULL, msg, 0); - } - - shared_ptr<FragmentExecState> exec_state( - new FragmentExecState(exec_params, ExecEnv::GetInstance())); - - // Register exec_state before this RPC returns so that async Cancel() calls (which can - // only happen after this RPC returns) can always find this fragment. - { - lock_guard<SpinLock> l(fragment_exec_state_map_lock_); - DCHECK(fragment_exec_state_map_.find(exec_state->fragment_instance_id()) - == fragment_exec_state_map_.end()); - fragment_exec_state_map_.insert( - make_pair(exec_state->fragment_instance_id(), exec_state)); - } - - ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(1L); - ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->Increment(1L); - - // Execute plan fragment in new thread. - // TODO: manage threads via global thread pool? - const TUniqueId& fragment_id = exec_state->fragment_instance_id(); - exec_state->set_exec_thread(new Thread("fragment-mgr", - Substitute("exec-plan-fragment-$0", PrintId(fragment_id)), - &FragmentMgr::FragmentThread, this, fragment_id)); - - return Status::OK(); -} - -void FragmentMgr::FragmentThread(TUniqueId fragment_instance_id) { - shared_ptr<FragmentExecState> exec_state = GetFragmentExecState(fragment_instance_id); - if (exec_state.get() == NULL) return; - exec_state->Exec(); - - // We're done with this plan fragment - { - lock_guard<SpinLock> l(fragment_exec_state_map_lock_); - size_t num_erased = - fragment_exec_state_map_.erase(exec_state->fragment_instance_id()); - DCHECK_EQ(num_erased, 1); - } - // TODO: this might be imprecise, if another client of FragmentMgr has a reference to - // the fragment exec state. - ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(-1L); - - VLOG_QUERY << "PlanFragment completed. instance_id=" << fragment_instance_id; - -#ifndef ADDRESS_SANITIZER - // tcmalloc and address sanitizer can not be used together - if (FLAGS_log_mem_usage_interval > 0) { - uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->value(); - if (num_complete % FLAGS_log_mem_usage_interval == 0) { - char buf[2048]; - // This outputs how much memory is currently being used by this impalad - MallocExtension::instance()->GetStats(buf, 2048); - LOG(INFO) << buf; - } - } -#endif -} - -shared_ptr<FragmentMgr::FragmentExecState> FragmentMgr::GetFragmentExecState( - const TUniqueId& fragment_instance_id) { - lock_guard<SpinLock> l(fragment_exec_state_map_lock_); - FragmentExecStateMap::const_iterator i = - fragment_exec_state_map_.find(fragment_instance_id); - if (i == fragment_exec_state_map_.end()) return shared_ptr<FragmentExecState>(); - - return i->second; -} - -void FragmentMgr::CancelPlanFragment(TCancelPlanFragmentResult& return_val, - const TCancelPlanFragmentParams& params) { - VLOG_QUERY << "CancelPlanFragment(): instance_id=" << params.fragment_instance_id; - shared_ptr<FragmentExecState> exec_state = - GetFragmentExecState(params.fragment_instance_id); - if (exec_state.get() == NULL) { - Status status(ErrorMsg(TErrorCode::INTERNAL_ERROR, - Substitute("Unknown fragment id: $0", - lexical_cast<string>(params.fragment_instance_id)))); - status.SetTStatus(&return_val); - return; - } - // we only initiate cancellation here, the map entry as well as the exec state - // are removed when fragment execution terminates (which is at present still - // running in exec_state->exec_thread_) - exec_state->Cancel().SetTStatus(&return_val); -} - -void FragmentMgr::PublishFilter(TPublishFilterResult& return_val, - const TPublishFilterParams& params) { - VLOG_FILE << "PublishFilter(): dst_instance_id=" << params.dst_instance_id; - shared_ptr<FragmentExecState> fragment_exec_state = - GetFragmentExecState(params.dst_instance_id); - if (fragment_exec_state.get() == NULL) { - LOG(INFO) << "Unknown fragment (ID: " << params.dst_instance_id - << ") for filter (ID: " << params.filter_id << ")"; - return; - } - fragment_exec_state->PublishFilter(params.filter_id, params.bloom_filter); -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/fragment-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/service/fragment-mgr.h b/be/src/service/fragment-mgr.h deleted file mode 100644 index 1a180cf..0000000 --- a/be/src/service/fragment-mgr.h +++ /dev/null @@ -1,89 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_SERVICE_FRAGMENT_MGR_H -#define IMPALA_SERVICE_FRAGMENT_MGR_H - -#include <boost/thread/mutex.hpp> -#include <boost/unordered_map.hpp> - -#include "gen-cpp/ImpalaInternalService.h" -#include "common/status.h" -#include "util/spinlock.h" - -namespace impala { - -/// Manages execution of individual plan fragment instances, which are typically run as a -/// result of ExecPlanFragment() RPCs that arrive via the internal Impala interface. -// -/// A fragment is started in ExecPlanFragment(), which starts a thread which runs -/// FragmentThread() asynchronously and returns. Fragments are Prepare()'d in that thread, -/// and then Exec() is called. At any point a fragment may terminate either by -/// cancellation via CancelPlanFragment(), or when FragmentThread() returns. -// -/// TODO: Remove Thrift args from methods where it would improve readability; -/// ImpalaInternalService can take care of translation to / from Thrift, as it already -/// does for ExecPlanFragment()'s return value. -class FragmentMgr { - public: - /// Registers a new FragmentExecState and launches the thread that calls Prepare() and - /// Exec() on it. - /// - /// Returns an error if there was some unrecoverable problem before the fragment was - /// registered (like low memory). Otherwise, returns OK, which guarantees that the - /// fragment is registered and must either run to completion or be cancelled. - /// - /// After this call returns, it is legal to call CancelPlanFragment() on this - /// fragment. If this call returns an error, CancelPlanFragment() will be a no-op - /// (because the fragment is unregistered). - Status ExecPlanFragment(const TExecPlanFragmentParams& params); - - /// Cancels a plan fragment that is running asynchronously. - void CancelPlanFragment(TCancelPlanFragmentResult& return_val, - const TCancelPlanFragmentParams& params); - - class FragmentExecState; - - /// Returns a shared pointer to the FragmentExecState if one can be found for the - /// given id. If the id is not found, the shared pointer will contain NULL. - std::shared_ptr<FragmentExecState> GetFragmentExecState( - const TUniqueId& fragment_instance_id); - - /// Publishes a global runtime filter to a local fragment. - void PublishFilter(TPublishFilterResult& return_val, - const TPublishFilterParams& params); - - private: - /// protects fragment_exec_state_map_ - SpinLock fragment_exec_state_map_lock_; - - /// Map from fragment instance id to exec state; FragmentExecState is owned by us and - /// referenced as a shared_ptr to allow asynchronous calls to CancelPlanFragment() - typedef boost::unordered_map<TUniqueId, std::shared_ptr<FragmentExecState>> - FragmentExecStateMap; - FragmentExecStateMap fragment_exec_state_map_; - - /// Retrieves the 'exec_state' corresponding to fragment_instance_id. Calls - /// exec_state->Prepare() and then exec_state->Exec(). Finally unregisters the - /// exec_state from fragment_exec_state_map_. The exec_state must previously have been - /// registered in fragment_exec_state_map_. Runs in the fragment's execution thread. - void FragmentThread(TUniqueId fragment_instance_id); -}; - -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/impala-beeswax-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc index e740be2..ad4963b 100644 --- a/be/src/service/impala-beeswax-server.cc +++ b/be/src/service/impala-beeswax-server.cc @@ -30,6 +30,8 @@ #include "service/query-result-set.h" #include "util/impalad-metrics.h" #include "util/webserver.h" +#include "util/runtime-profile.h" +#include "util/runtime-profile-counters.h" #include "common/names.h" @@ -66,7 +68,7 @@ void ImpalaServer::query(QueryHandle& query_handle, const Query& query) { RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state), SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION); - exec_state->UpdateNonErrorQueryState(QueryState::RUNNING); + exec_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING); // start thread to wait for results to become available, which will allow // us to advance query state to FINISHED or EXCEPTION exec_state->WaitAsync(); @@ -107,7 +109,7 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query, RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state), SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION); - exec_state->UpdateNonErrorQueryState(QueryState::RUNNING); + exec_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING); // Once the query is running do a final check for session closure and add it to the // set of in-flight queries. Status status = SetQueryInflight(session, exec_state); @@ -123,7 +125,7 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query, RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR); } - exec_state->UpdateNonErrorQueryState(QueryState::FINISHED); + exec_state->UpdateNonErrorQueryState(beeswax::QueryState::FINISHED); TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle); // If the input log context id is an empty string, then create a new number and @@ -146,7 +148,7 @@ void ImpalaServer::explain(QueryExplanation& query_explanation, const Query& que exec_env_->frontend()->GetExplainPlan(query_ctx, &query_explanation.textual), SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION); query_explanation.__isset.textual = true; - VLOG_QUERY << "explain():\nstmt=" << query_ctx.request.stmt + VLOG_QUERY << "explain():\nstmt=" << query_ctx.client_request.stmt << "\nplan: " << query_explanation.textual; } @@ -233,7 +235,7 @@ void ImpalaServer::close(const QueryHandle& handle) { RAISE_IF_ERROR(UnregisterQuery(query_id, true), SQLSTATE_GENERAL_ERROR); } -QueryState::type ImpalaServer::get_state(const QueryHandle& handle) { +beeswax::QueryState::type ImpalaServer::get_state(const QueryHandle& handle) { ScopedSessionState session_handle(this); RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()), SQLSTATE_GENERAL_ERROR); @@ -250,7 +252,7 @@ QueryState::type ImpalaServer::get_state(const QueryHandle& handle) { RaiseBeeswaxException("Invalid query handle", SQLSTATE_GENERAL_ERROR); } // dummy to keep compiler happy - return QueryState::FINISHED; + return beeswax::QueryState::FINISHED; } void ImpalaServer::echo(string& echo_string, const string& input_string) { @@ -393,7 +395,7 @@ void ImpalaServer::ResetTable(impala::TStatus& status, const TResetTableReq& req Status ImpalaServer::QueryToTQueryContext(const Query& query, TQueryCtx* query_ctx) { - query_ctx->request.stmt = query.query; + query_ctx->client_request.stmt = query.query; VLOG_QUERY << "query: " << ThriftDebugString(query); QueryOptionsMask set_query_options_mask; { @@ -407,7 +409,7 @@ Status ImpalaServer::QueryToTQueryContext(const Query& query, // set yet, set it now. lock_guard<mutex> l(session->lock); if (session->connected_user.empty()) session->connected_user = query.hadoop_user; - query_ctx->request.query_options = session->default_query_options; + query_ctx->client_request.query_options = session->default_query_options; set_query_options_mask = session->set_query_options_mask; } session->ToThrift(session_id, &query_ctx->session); @@ -416,7 +418,7 @@ Status ImpalaServer::QueryToTQueryContext(const Query& query, // Override default query options with Query.Configuration if (query.__isset.configuration) { for (const string& option: query.configuration) { - RETURN_IF_ERROR(ParseQueryOptions(option, &query_ctx->request.query_options, + RETURN_IF_ERROR(ParseQueryOptions(option, &query_ctx->client_request.query_options, &set_query_options_mask)); } } @@ -425,7 +427,7 @@ Status ImpalaServer::QueryToTQueryContext(const Query& query, // pool options. AddPoolQueryOptions(query_ctx, ~set_query_options_mask); VLOG_QUERY << "TClientRequest.queryOptions: " - << ThriftDebugString(query_ctx->request.query_options); + << ThriftDebugString(query_ctx->client_request.query_options); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/impala-hs2-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index ab4d1f3..5313b7d 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -36,6 +36,7 @@ #include "exprs/expr.h" #include "rpc/thrift-util.h" #include "runtime/raw-value.h" +#include "runtime/exec-env.h" #include "service/hs2-util.h" #include "service/query-exec-state.h" #include "service/query-options.h" @@ -87,7 +88,15 @@ const string IMPALA_RESULT_CACHING_OPT = "impala.resultset.cache.size"; // Helper function to translate between Beeswax and HiveServer2 type static TOperationState::type QueryStateToTOperationState( - const beeswax::QueryState::type& query_state); + const beeswax::QueryState::type& query_state) { + switch (query_state) { + case beeswax::QueryState::CREATED: return TOperationState::INITIALIZED_STATE; + case beeswax::QueryState::RUNNING: return TOperationState::RUNNING_STATE; + case beeswax::QueryState::FINISHED: return TOperationState::FINISHED_STATE; + case beeswax::QueryState::EXCEPTION: return TOperationState::ERROR_STATE; + default: return TOperationState::UKNOWN_STATE; + } +} void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, TMetadataOpRequest* request, TOperationHandle* handle, thrift::TStatus* status) { @@ -131,7 +140,7 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, _TMetadataOpcode_VALUES_TO_NAMES.find(request->opcode); const string& query_text = query_text_it == _TMetadataOpcode_VALUES_TO_NAMES.end() ? "N/A" : query_text_it->second; - query_ctx.request.stmt = query_text; + query_ctx.client_request.stmt = query_text; exec_state.reset(new QueryExecState(query_ctx, exec_env_, exec_env_->frontend(), this, session)); Status register_status = RegisterQuery(session, exec_state); @@ -151,7 +160,7 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, return; } - exec_state->UpdateNonErrorQueryState(QueryState::FINISHED); + exec_state->UpdateNonErrorQueryState(beeswax::QueryState::FINISHED); Status inflight_status = SetQueryInflight(session, exec_state); if (!inflight_status.ok()) { @@ -215,7 +224,7 @@ Status ImpalaServer::FetchInternal(const TUniqueId& query_id, int32_t fetch_size Status ImpalaServer::TExecuteStatementReqToTQueryContext( const TExecuteStatementReq execute_request, TQueryCtx* query_ctx) { - query_ctx->request.stmt = execute_request.statement; + query_ctx->client_request.stmt = execute_request.statement; VLOG_QUERY << "TExecuteStatementReq: " << ThriftDebugString(execute_request); QueryOptionsMask set_query_options_mask; { @@ -228,7 +237,7 @@ Status ImpalaServer::TExecuteStatementReqToTQueryContext( RETURN_IF_ERROR(GetSessionState(session_id, &session_state)); session_state->ToThrift(session_id, &query_ctx->session); lock_guard<mutex> l(session_state->lock); - query_ctx->request.query_options = session_state->default_query_options; + query_ctx->client_request.query_options = session_state->default_query_options; set_query_options_mask = session_state->set_query_options_mask; } @@ -243,14 +252,14 @@ Status ImpalaServer::TExecuteStatementReqToTQueryContext( continue; } RETURN_IF_ERROR(SetQueryOption(conf_itr->first, conf_itr->second, - &query_ctx->request.query_options, &set_query_options_mask)); + &query_ctx->client_request.query_options, &set_query_options_mask)); } } // Only query options not set in the session or confOverlay can be overridden by the // pool options. AddPoolQueryOptions(query_ctx, ~set_query_options_mask); VLOG_QUERY << "TClientRequest.queryOptions: " - << ThriftDebugString(query_ctx->request.query_options); + << ThriftDebugString(query_ctx->client_request.query_options); return Status::OK(); } @@ -455,7 +464,7 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val, HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR); } } - exec_state->UpdateNonErrorQueryState(QueryState::RUNNING); + exec_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING); // Start thread to wait for results to become available. exec_state->WaitAsync(); // Once the query is running do a final check for session closure and add it to the @@ -878,14 +887,5 @@ void ImpalaServer::RenewDelegationToken(TRenewDelegationTokenResp& return_val, return_val.status.__set_errorMessage("Not implemented"); } -TOperationState::type QueryStateToTOperationState(const QueryState::type& query_state) { - switch (query_state) { - case QueryState::CREATED: return TOperationState::INITIALIZED_STATE; - case QueryState::RUNNING: return TOperationState::RUNNING_STATE; - case QueryState::FINISHED: return TOperationState::FINISHED_STATE; - case QueryState::EXCEPTION: return TOperationState::ERROR_STATE; - default: return TOperationState::UKNOWN_STATE; - } -} } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/impala-http-handler.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc index b675f3a..a324b1f 100644 --- a/be/src/service/impala-http-handler.cc +++ b/be/src/service/impala-http-handler.cc @@ -25,6 +25,7 @@ #include "catalog/catalog-util.h" #include "gen-cpp/beeswax_types.h" #include "runtime/mem-tracker.h" +#include "runtime/exec-env.h" #include "service/impala-server.h" #include "service/query-exec-state.h" #include "thrift/protocol/TDebugProtocol.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/impala-internal-service.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc new file mode 100644 index 0000000..fff7f9e --- /dev/null +++ b/be/src/service/impala-internal-service.cc @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "service/impala-internal-service.h" + +#include <boost/lexical_cast.hpp> + +#include "common/status.h" +#include "service/impala-server.h" +#include "runtime/query-exec-mgr.h" +#include "runtime/query-state.h" +#include "runtime/fragment-instance-state.h" +#include "runtime/exec-env.h" +#include "testutil/fault-injection-util.h" + +using namespace impala; + +ImpalaInternalService::ImpalaInternalService() { + impala_server_ = ExecEnv::GetInstance()->impala_server(); + DCHECK(impala_server_ != nullptr); + query_exec_mgr_ = ExecEnv::GetInstance()->query_exec_mgr(); + DCHECK(query_exec_mgr_ != nullptr); +} + +void ImpalaInternalService::ExecPlanFragment(TExecPlanFragmentResult& return_val, + const TExecPlanFragmentParams& params) { + VLOG_QUERY << "ExecPlanFragment():" + << " instance_id=" << params.fragment_instance_ctx.fragment_instance_id; + FAULT_INJECTION_RPC_DELAY(RPC_EXECPLANFRAGMENT); + query_exec_mgr_->StartFInstance(params).SetTStatus(&return_val); +} + +template <typename T> void SetUnknownIdError( + const string& id_type, const TUniqueId& id, T* status_container) { + Status status(ErrorMsg(TErrorCode::INTERNAL_ERROR, + Substitute("Unknown $0 id: $1", id_type, lexical_cast<string>(id)))); + status.SetTStatus(status_container); +} + +void ImpalaInternalService::CancelPlanFragment(TCancelPlanFragmentResult& return_val, + const TCancelPlanFragmentParams& params) { + VLOG_QUERY << "CancelPlanFragment(): instance_id=" << params.fragment_instance_id; + FAULT_INJECTION_RPC_DELAY(RPC_CANCELPLANFRAGMENT); + QueryState::ScopedRef qs(GetQueryId(params.fragment_instance_id)); + if (qs.get() == nullptr) { + SetUnknownIdError("query", GetQueryId(params.fragment_instance_id), &return_val); + return; + } + FragmentInstanceState* fis = qs->GetFInstanceState(params.fragment_instance_id); + if (fis == nullptr) { + SetUnknownIdError("instance", params.fragment_instance_id, &return_val); + return; + } + Status status = fis->Cancel(); + status.SetTStatus(&return_val); +} + +void ImpalaInternalService::ReportExecStatus(TReportExecStatusResult& return_val, + const TReportExecStatusParams& params) { + VLOG_QUERY << "ReportExecStatus(): instance_id=" << params.fragment_instance_id; + FAULT_INJECTION_RPC_DELAY(RPC_REPORTEXECSTATUS); + impala_server_->ReportExecStatus(return_val, params); +} + +void ImpalaInternalService::TransmitData(TTransmitDataResult& return_val, + const TTransmitDataParams& params) { + FAULT_INJECTION_RPC_DELAY(RPC_TRANSMITDATA); + impala_server_->TransmitData(return_val, params); +} + +void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val, + const TUpdateFilterParams& params) { + VLOG_QUERY << "UpdateFilter(): filter=" << params.filter_id + << " query_id=" << PrintId(params.query_id); + FAULT_INJECTION_RPC_DELAY(RPC_UPDATEFILTER); + impala_server_->UpdateFilter(return_val, params); +} + +void ImpalaInternalService::PublishFilter(TPublishFilterResult& return_val, + const TPublishFilterParams& params) { + VLOG_QUERY << "PublishFilter(): filter=" << params.filter_id + << " instance_id=" << PrintId(params.dst_instance_id); + FAULT_INJECTION_RPC_DELAY(RPC_PUBLISHFILTER); + QueryState::ScopedRef qs(GetQueryId(params.dst_instance_id)); + if (qs.get() == nullptr) return; + FragmentInstanceState* fis = qs->GetFInstanceState(params.dst_instance_id); + if (fis == nullptr) return; + fis->PublishFilter(params.filter_id, params.bloom_filter); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/impala-internal-service.h ---------------------------------------------------------------------- diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h index af54c35..372d617 100644 --- a/be/src/service/impala-internal-service.h +++ b/be/src/service/impala-internal-service.h @@ -20,65 +20,33 @@ #include "gen-cpp/ImpalaInternalService.h" #include "gen-cpp/ImpalaInternalService_types.h" -#include "service/impala-server.h" -#include "service/fragment-mgr.h" -#include "testutil/fault-injection-util.h" namespace impala { +class ImpalaServer; +class QueryExecMgr; + /// Proxies Thrift RPC requests onto their implementing objects for the /// ImpalaInternalService service. class ImpalaInternalService : public ImpalaInternalServiceIf { public: - ImpalaInternalService() { - impala_server_ = ExecEnv::GetInstance()->impala_server(); - DCHECK(impala_server_ != nullptr); - fragment_mgr_ = ExecEnv::GetInstance()->fragment_mgr(); - DCHECK(fragment_mgr_ != nullptr); - } - + ImpalaInternalService(); virtual void ExecPlanFragment(TExecPlanFragmentResult& return_val, - const TExecPlanFragmentParams& params) { - FAULT_INJECTION_RPC_DELAY(RPC_EXECPLANFRAGMENT); - fragment_mgr_->ExecPlanFragment(params).SetTStatus(&return_val); - } - + const TExecPlanFragmentParams& params); virtual void CancelPlanFragment(TCancelPlanFragmentResult& return_val, - const TCancelPlanFragmentParams& params) { - FAULT_INJECTION_RPC_DELAY(RPC_CANCELPLANFRAGMENT); - fragment_mgr_->CancelPlanFragment(return_val, params); - } - + const TCancelPlanFragmentParams& params); virtual void ReportExecStatus(TReportExecStatusResult& return_val, - const TReportExecStatusParams& params) { - FAULT_INJECTION_RPC_DELAY(RPC_REPORTEXECSTATUS); - impala_server_->ReportExecStatus(return_val, params); - } - + const TReportExecStatusParams& params); virtual void TransmitData(TTransmitDataResult& return_val, - const TTransmitDataParams& params) { - FAULT_INJECTION_RPC_DELAY(RPC_TRANSMITDATA); - impala_server_->TransmitData(return_val, params); - } - + const TTransmitDataParams& params); virtual void UpdateFilter(TUpdateFilterResult& return_val, - const TUpdateFilterParams& params) { - FAULT_INJECTION_RPC_DELAY(RPC_UPDATEFILTER); - impala_server_->UpdateFilter(return_val, params); - } - + const TUpdateFilterParams& params); virtual void PublishFilter(TPublishFilterResult& return_val, - const TPublishFilterParams& params) { - FAULT_INJECTION_RPC_DELAY(RPC_PUBLISHFILTER); - fragment_mgr_->PublishFilter(return_val, params); - } + const TPublishFilterParams& params); private: - /// Manages fragment reporting and data transmission ImpalaServer* impala_server_; - - /// Manages fragment execution - FragmentMgr* fragment_mgr_; + QueryExecMgr* query_exec_mgr_; }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 5e08224..a5f176b 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -54,7 +54,6 @@ #include "runtime/lib-cache.h" #include "runtime/timestamp-value.h" #include "runtime/tmp-file-mgr.h" -#include "service/fragment-exec-state.h" #include "service/impala-internal-service.h" #include "service/impala-http-handler.h" #include "service/query-exec-state.h" @@ -72,6 +71,8 @@ #include "util/string-parser.h" #include "util/summary-util.h" #include "util/uid-util.h" +#include "util/runtime-profile.h" +#include "util/runtime-profile-counters.h" #include "gen-cpp/Types_types.h" #include "gen-cpp/ImpalaService.h" @@ -742,7 +743,7 @@ void ImpalaServer::AddPoolQueryOptions(TQueryCtx* ctx, << " override_options_mask=" << override_options_mask.to_string() << " set_pool_mask=" << set_pool_options_mask.to_string() << " overlay_mask=" << overlay_mask.to_string(); - OverlayQueryOptions(pool_options, overlay_mask, &ctx->request.query_options); + OverlayQueryOptions(pool_options, overlay_mask, &ctx->client_request.query_options); } Status ImpalaServer::Execute(TQueryCtx* query_ctx, @@ -752,9 +753,9 @@ Status ImpalaServer::Execute(TQueryCtx* query_ctx, ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES->Increment(1L); // Redact the SQL stmt and update the query context - string stmt = replace_all_copy(query_ctx->request.stmt, "\n", " "); + string stmt = replace_all_copy(query_ctx->client_request.stmt, "\n", " "); Redact(&stmt); - query_ctx->request.__set_redacted_stmt((const string) stmt); + query_ctx->client_request.__set_redacted_stmt((const string) stmt); bool registered_exec_state; Status status = ExecuteInternal(*query_ctx, session_state, ®istered_exec_state, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/impala-server.h ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index d87af39..e2f9c67 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -45,6 +45,7 @@ #include "runtime/runtime-state.h" #include "runtime/timestamp-value.h" #include "runtime/types.h" +#include "statestore/statestore-subscriber.h" namespace impala { @@ -259,8 +260,9 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// - incoming_topic_deltas: all changes to registered statestore topics /// - subscriber_topic_updates: output parameter to publish any topic updates to. /// Currently unused. - void MembershipCallback(const StatestoreSubscriber::TopicDeltaMap& - incoming_topic_deltas, std::vector<TTopicDelta>* subscriber_topic_updates); + void MembershipCallback( + const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas, + std::vector<TTopicDelta>* subscriber_topic_updates); void CatalogUpdateCallback(const StatestoreSubscriber::TopicDeltaMap& topic_deltas, std::vector<TTopicDelta>* topic_updates); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/query-exec-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc index 6075914..6fa6d4a 100644 --- a/be/src/service/query-exec-state.cc +++ b/be/src/service/query-exec-state.cc @@ -24,6 +24,8 @@ #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" +#include "runtime/exec-env.h" +#include "scheduling/scheduler.h" #include "service/frontend.h" #include "service/impala-server.h" #include "service/query-options.h" @@ -114,7 +116,7 @@ ImpalaServer::QueryExecState::QueryExecState( summary_profile_.AddInfoString("Network Address", lexical_cast<string>(session_->network_address)); summary_profile_.AddInfoString("Default Db", default_db()); - summary_profile_.AddInfoString("Sql Statement", query_ctx_.request.stmt); + summary_profile_.AddInfoString("Sql Statement", query_ctx_.client_request.stmt); summary_profile_.AddInfoString("Coordinator", TNetworkAddressToString(exec_env->backend_address())); } @@ -145,7 +147,7 @@ Status ImpalaServer::QueryExecState::Exec(TExecRequest* exec_request) { summary_profile_.AddInfoString("Query Type", PrintTStmtType(stmt_type())); summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_)); summary_profile_.AddInfoString("Query Options (non default)", - DebugQueryOptions(query_ctx_.request.query_options)); + DebugQueryOptions(query_ctx_.client_request.query_options)); switch (exec_request->stmt_type) { case TStmtType::QUERY: @@ -615,7 +617,7 @@ void ImpalaServer::QueryExecState::Wait() { UpdateQueryStatus(status); } if (status.ok()) { - UpdateNonErrorQueryState(QueryState::FINISHED); + UpdateNonErrorQueryState(beeswax::QueryState::FINISHED); } } @@ -691,16 +693,17 @@ Status ImpalaServer::QueryExecState::RestartFetch() { return Status::OK(); } -void ImpalaServer::QueryExecState::UpdateNonErrorQueryState(QueryState::type query_state) { +void ImpalaServer::QueryExecState::UpdateNonErrorQueryState( + beeswax::QueryState::type query_state) { lock_guard<mutex> l(lock_); - DCHECK(query_state != QueryState::EXCEPTION); + DCHECK(query_state != beeswax::QueryState::EXCEPTION); if (query_state_ < query_state) query_state_ = query_state; } Status ImpalaServer::QueryExecState::UpdateQueryStatus(const Status& status) { // Preserve the first non-ok status if (!status.ok() && query_status_.ok()) { - query_state_ = QueryState::EXCEPTION; + query_state_ = beeswax::QueryState::EXCEPTION; query_status_ = status; summary_profile_.AddInfoString("Query Status", query_status_.GetDetail()); } @@ -710,12 +713,12 @@ Status ImpalaServer::QueryExecState::UpdateQueryStatus(const Status& status) { Status ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows, QueryResultSet* fetched_rows) { - DCHECK(query_state_ != QueryState::EXCEPTION); + DCHECK(query_state_ != beeswax::QueryState::EXCEPTION); if (eos_) return Status::OK(); if (request_result_set_ != NULL) { - query_state_ = QueryState::FINISHED; + query_state_ = beeswax::QueryState::FINISHED; int num_rows = 0; const vector<TResultRow>& all_rows = (*(request_result_set_.get())); // max_rows <= 0 means no limit @@ -743,7 +746,7 @@ Status ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows, if (num_rows_fetched_from_cache >= max_rows) return Status::OK(); } - query_state_ = QueryState::FINISHED; // results will be ready after this call + query_state_ = beeswax::QueryState::FINISHED; // results will be ready after this call // Maximum number of rows to be fetched from the coord. int32_t max_coord_rows = max_rows; @@ -847,12 +850,12 @@ Status ImpalaServer::QueryExecState::Cancel(bool check_inflight, const Status* c { lock_guard<mutex> lock(lock_); // If the query is completed or cancelled, no need to update state. - bool already_done = eos_ || query_state_ == QueryState::EXCEPTION; + bool already_done = eos_ || query_state_ == beeswax::QueryState::EXCEPTION; if (!already_done && cause != NULL) { DCHECK(!cause->ok()); UpdateQueryStatus(*cause); query_events_->MarkEvent("Cancelled"); - DCHECK_EQ(query_state_, QueryState::EXCEPTION); + DCHECK_EQ(query_state_, beeswax::QueryState::EXCEPTION); } // Get a copy of the coordinator pointer while holding 'lock_'. coord = coord_.get(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/query-exec-state.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-exec-state.h b/be/src/service/query-exec-state.h index 28c8cba..20cd7bf 100644 --- a/be/src/service/query-exec-state.h +++ b/be/src/service/query-exec-state.h @@ -184,8 +184,10 @@ class ImpalaServer::QueryExecState { const RuntimeProfile& summary_profile() const { return summary_profile_; } const TimestampValue& start_time() const { return start_time_; } const TimestampValue& end_time() const { return end_time_; } - const std::string& sql_stmt() const { return query_ctx_.request.stmt; } - const TQueryOptions& query_options() const { return query_ctx_.request.query_options; } + const std::string& sql_stmt() const { return query_ctx_.client_request.stmt; } + const TQueryOptions& query_options() const { + return query_ctx_.client_request.query_options; + } /// Returns 0:0 if this is a root query TUniqueId parent_query_id() const { return query_ctx_.parent_query_id; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/testutil/desc-tbl-builder.h ---------------------------------------------------------------------- diff --git a/be/src/testutil/desc-tbl-builder.h b/be/src/testutil/desc-tbl-builder.h index 9ad92b1..9d27092 100644 --- a/be/src/testutil/desc-tbl-builder.h +++ b/be/src/testutil/desc-tbl-builder.h @@ -18,13 +18,18 @@ #ifndef IMPALA_TESTUTIL_ROW_DESC_BUILDER_H_ #define IMPALA_TESTUTIL_ROW_DESC_BUILDER_H_ +#include <vector> + #include "runtime/runtime-state.h" #include "runtime/types.h" +#include "gen-cpp/Descriptors_types.h" namespace impala { +class Frontend; class ObjectPool; class TupleDescBuilder; +class DescriptorTbl; /// Aids in the construction of a DescriptorTbl by declaring tuples and slots /// associated with those tuples. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/udf/udf.cc ---------------------------------------------------------------------- diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc index 9c3b131..cf1fa0f 100644 --- a/be/src/udf/udf.cc +++ b/be/src/udf/udf.cc @@ -93,7 +93,7 @@ class RuntimeState { } const std::string connected_user() const { return ""; } - const std::string effective_user() const { return ""; } + const std::string GetEffectiveUser() const { return ""; } }; } @@ -249,7 +249,7 @@ const char* FunctionContext::user() const { const char* FunctionContext::effective_user() const { if (impl_->state_ == NULL) return NULL; - return impl_->state_->effective_user().c_str(); + return impl_->state_->GetEffectiveUser().c_str(); } FunctionContext::UniqueId FunctionContext::query_id() const { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/util/container-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/container-util.h b/be/src/util/container-util.h index efb4f52..80ec984 100644 --- a/be/src/util/container-util.h +++ b/be/src/util/container-util.h @@ -22,6 +22,7 @@ #include <map> #include <unordered_map> #include <boost/unordered_map.hpp> +#include <boost/functional/hash.hpp> #include "util/hash-util.h" #include "gen-cpp/Types_types.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/util/thread.h ---------------------------------------------------------------------- diff --git a/be/src/util/thread.h b/be/src/util/thread.h index 4e2b65d..a142a94 100644 --- a/be/src/util/thread.h +++ b/be/src/util/thread.h @@ -103,6 +103,11 @@ class Thread { /// will be unregistered with the ThreadMgr and will not appear in the debug UI. void Join() const { thread_->join(); } + /// Detaches the underlying thread from this Thread object. It's illegal to call + /// Join() after calling Detach(). When the underlying thread finishes execution, + /// it unregisters itself from the ThreadMgr. + void Detach() const { thread_->detach(); } + /// The thread ID assigned to this thread by the operating system. If the OS does not /// support retrieving the tid, returns Thread::INVALID_THREAD_ID. int64_t tid() const { return tid_; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/util/uid-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/uid-util.h b/be/src/util/uid-util.h index de18464..b532278 100644 --- a/be/src/util/uid-util.h +++ b/be/src/util/uid-util.h @@ -29,14 +29,28 @@ namespace impala { -/// This function must be called 'hash_value' to be picked up by boost. -inline std::size_t hash_value(const impala::TUniqueId& id) { +inline std::size_t hash_value(const TUniqueId& id) { std::size_t seed = 0; boost::hash_combine(seed, id.lo); boost::hash_combine(seed, id.hi); return seed; } +} + +/// Hash function for std:: containers +namespace std { + +template<> struct hash<impala::TUniqueId> { + std::size_t operator()(const impala::TUniqueId& id) const { + return impala::hash_value(id); + } +}; + +} + +namespace impala { + inline void UUIDToTUniqueId(const boost::uuids::uuid& uuid, TUniqueId* unique_id) { memcpy(&(unique_id->hi), &uuid.data[0], 8); memcpy(&(unique_id->lo), &uuid.data[8], 8); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 5e79c07..bb4251b 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -267,8 +267,7 @@ struct TClientRequest { // TODO: Separate into FE/BE initialized vars. struct TQueryCtx { // Client request containing stmt to execute and query options. - // TODO: rename to client_request, we have too many requests - 1: required TClientRequest request + 1: required TClientRequest client_request // A globally unique id assigned to the entire query in the BE. // The bottom 4 bytes are 0 (for details see be/src/util/uid-util.h). http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java index e720867..d57c25f 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java +++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java @@ -72,7 +72,7 @@ public class AnalysisContext { // expr rewrites can be disabled via a query option. When rewrites are enabled // BetweenPredicates should be rewritten first to help trigger other rules. List<ExprRewriteRule> rules = Lists.newArrayList(BetweenToCompoundRule.INSTANCE); - if (queryCtx.getRequest().getQuery_options().enable_expr_rewrites) { + if (queryCtx.getClient_request().getQuery_options().enable_expr_rewrites) { rules.add(FoldConstantsRule.INSTANCE); rules.add(ExtractCommonConjunctRule.INSTANCE); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/fe/src/main/java/org/apache/impala/analysis/Analyzer.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java index 61d1c20..8bea3aa 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java +++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java @@ -2285,7 +2285,7 @@ public class Analyzer { public User getUser() { return user_; } public TQueryCtx getQueryCtx() { return globalState_.queryCtx; } public TQueryOptions getQueryOptions() { - return globalState_.queryCtx.getRequest().getQuery_options(); + return globalState_.queryCtx.client_request.getQuery_options(); } public AuthorizationConfig getAuthzConfig() { return globalState_.authzConfig; } public ListMap<TNetworkAddress> getHostIndex() { return globalState_.hostIndex; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java index 92b7106..dac4e8f 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java +++ b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java @@ -367,10 +367,10 @@ public class ColumnLineageGraph { Preconditions.checkNotNull(analyzer); Preconditions.checkState(analyzer.isRootAnalyzer()); TQueryCtx queryCtx = analyzer.getQueryCtx(); - if (queryCtx.request.isSetRedacted_stmt()) { - queryStr_ = queryCtx.request.redacted_stmt; + if (queryCtx.client_request.isSetRedacted_stmt()) { + queryStr_ = queryCtx.client_request.redacted_stmt; } else { - queryStr_ = queryCtx.request.stmt; + queryStr_ = queryCtx.client_request.stmt; } Preconditions.checkNotNull(queryStr_); timestamp_ = queryCtx.start_unix_millis / 1000; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java index 74399d0..e3f4e6a 100644 --- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java @@ -641,7 +641,7 @@ public class SelectStmt extends QueryStmt { // Optionally rewrite all count(distinct <expr>) into equivalent NDV() calls. ExprSubstitutionMap ndvSmap = null; - if (analyzer.getQueryCtx().getRequest().query_options.appx_count_distinct) { + if (analyzer.getQueryCtx().client_request.query_options.appx_count_distinct) { ndvSmap = new ExprSubstitutionMap(); for (FunctionCallExpr aggExpr: aggExprs) { if (!aggExpr.isDistinct()
