Repository: incubator-impala Updated Branches: refs/heads/master 0ff1e6e8d -> 663285244
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/runtime-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index 0d7f262..c7b916b 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -32,14 +32,16 @@ #include "exprs/expr.h" #include "exprs/scalar-fn-call.h" #include "runtime/buffered-block-mgr.h" -#include "runtime/exec-env.h" -#include "runtime/descriptors.h" +#include "runtime/bufferpool/reservation-tracker.h" #include "runtime/data-stream-mgr.h" #include "runtime/data-stream-recvr.h" +#include "runtime/descriptors.h" +#include "runtime/exec-env.h" #include "runtime/mem-tracker.h" +#include "runtime/query-state.h" #include "runtime/runtime-filter-bank.h" #include "runtime/timestamp-value.h" -#include "runtime/query-state.h" +#include "util/auth-util.h" // for GetEffectiveUser() #include "util/bitmap.h" #include "util/cpu-info.h" #include "util/debug-util.h" @@ -48,7 +50,6 @@ #include "util/jni-util.h" #include "util/mem-info.h" #include "util/pretty-printer.h" -#include "util/auth-util.h" // for GetEffectiveUser() #include "common/names.h" @@ -83,6 +84,7 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag exec_env_(exec_env), profile_(obj_pool_.get(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)), query_mem_tracker_(query_state_->query_mem_tracker()), + instance_buffer_reservation_(nullptr), is_cancelled_(false), root_node_id_(-1) { Init(); @@ -100,6 +102,7 @@ RuntimeState::RuntimeState( profile_(obj_pool_.get(), "<unnamed>"), query_mem_tracker_(MemTracker::CreateQueryMemTracker( query_id(), query_options(), request_pool, obj_pool_.get())), + instance_buffer_reservation_(nullptr), is_cancelled_(false), root_node_id_(-1) { Init(); @@ -113,10 +116,8 @@ void RuntimeState::Init() { SCOPED_TIMER(profile_.total_time_counter()); // Register with the thread mgr - if (exec_env_ != NULL) { - resource_pool_ = exec_env_->thread_mgr()->RegisterPool(); - DCHECK(resource_pool_ != NULL); - } + resource_pool_ = exec_env_->thread_mgr()->RegisterPool(); + DCHECK(resource_pool_ != NULL); total_thread_statistics_ = ADD_THREAD_COUNTERS(runtime_profile(), "TotalThreads"); total_storage_wait_timer_ = ADD_TIMER(runtime_profile(), "TotalStorageWaitTime"); @@ -125,6 +126,13 @@ void RuntimeState::Init() { instance_mem_tracker_.reset(new MemTracker( runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker_)); + + if (query_state_ != nullptr && exec_env_->buffer_pool() != nullptr) { + instance_buffer_reservation_ = obj_pool_->Add(new ReservationTracker); + instance_buffer_reservation_->InitChildTracker(&profile_, + query_state_->buffer_reservation(), instance_mem_tracker_.get(), + numeric_limits<int64_t>::max()); + } } void RuntimeState::InitFilterBank() { @@ -291,6 +299,9 @@ void RuntimeState::ReleaseResources() { block_mgr_.reset(); // Release any block mgr memory, if this is the last reference. codegen_.reset(); // Release any memory associated with codegen. + // Release the reservation, which should be unused at the point. + if (instance_buffer_reservation_ != nullptr) instance_buffer_reservation_->Close(); + // 'query_mem_tracker_' must be valid as long as 'instance_mem_tracker_' is so // delete 'instance_mem_tracker_' first. // LogUsage() walks the MemTracker tree top-down when the memory limit is exceeded, so http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/runtime-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index 97014aa..009fee5 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -41,6 +41,7 @@ class Expr; class LlvmCodeGen; class MemTracker; class ObjectPool; +class ReservationTracker; class RuntimeFilterBank; class ScalarFnCall; class Status; @@ -128,6 +129,9 @@ class RuntimeState { DiskIoMgr* io_mgr(); MemTracker* instance_mem_tracker() { return instance_mem_tracker_.get(); } MemTracker* query_mem_tracker() { return query_mem_tracker_; } + ReservationTracker* instance_buffer_reservation() { + return instance_buffer_reservation_; + } ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; } FileMoveMap* hdfs_files_to_move() { return &hdfs_files_to_move_; } @@ -385,6 +389,11 @@ class RuntimeState { /// Memory usage of this fragment instance, a child of 'query_mem_tracker_'. boost::scoped_ptr<MemTracker> instance_mem_tracker_; + /// Buffer reservation for this fragment instance - a child of the query buffer + /// reservation. Non-NULL if 'query_state_' is not NULL and ExecEnv::buffer_pool_ + /// was created by a backend test. Owned by 'obj_pool_'. + ReservationTracker* instance_buffer_reservation_; + /// if true, execution should stop with a CANCELLED status bool is_cancelled_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/test-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc index 1c28acd..026b2ee 100644 --- a/be/src/runtime/test-env.cc +++ b/be/src/runtime/test-env.cc @@ -17,7 +17,11 @@ #include "runtime/test-env.h" +#include <limits> + +#include "runtime/buffered-block-mgr.h" #include "runtime/query-exec-mgr.h" +#include "runtime/tmp-file-mgr.h" #include "util/disk-info.h" #include "util/impalad-metrics.h" @@ -28,46 +32,56 @@ #include "common/names.h" using boost::scoped_ptr; +using std::numeric_limits; namespace impala { scoped_ptr<MetricGroup> TestEnv::static_metrics_; -TestEnv::TestEnv() { +TestEnv::TestEnv() + : have_tmp_file_mgr_args_(false), + buffer_pool_min_buffer_len_(1024), + buffer_pool_capacity_(0) {} + +Status TestEnv::Init() { if (static_metrics_ == NULL) { static_metrics_.reset(new MetricGroup("test-env-static-metrics")); ImpaladMetrics::CreateMetrics(static_metrics_.get()); } + exec_env_.reset(new ExecEnv); - exec_env_->InitForFeTests(); - io_mgr_tracker_.reset(new MemTracker(-1)); - Status status = exec_env_->disk_io_mgr()->Init(io_mgr_tracker_.get()); - CHECK(status.ok()) << status.msg().msg(); - InitMetrics(); - tmp_file_mgr_.reset(new TmpFileMgr); - status = tmp_file_mgr_->Init(metrics_.get()); - CHECK(status.ok()) << status.msg().msg(); + // Populate the ExecEnv state that the backend tests need. + exec_env_->mem_tracker_.reset(new MemTracker(-1, "Process")); + RETURN_IF_ERROR(exec_env_->disk_io_mgr()->Init(exec_env_->process_mem_tracker())); + exec_env_->metrics_.reset(new MetricGroup("test-env-metrics")); + exec_env_->tmp_file_mgr_.reset(new TmpFileMgr); + if (have_tmp_file_mgr_args_) { + RETURN_IF_ERROR( + tmp_file_mgr()->InitCustom(tmp_dirs_, one_tmp_dir_per_device_, metrics())); + } else { + RETURN_IF_ERROR(tmp_file_mgr()->Init(metrics())); + } + exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, buffer_pool_capacity_); + return Status::OK(); } -void TestEnv::InitMetrics() { - metrics_.reset(new MetricGroup("test-env-metrics")); +void TestEnv::SetTmpFileMgrArgs( + const std::vector<std::string>& tmp_dirs, bool one_dir_per_device) { + have_tmp_file_mgr_args_ = true; + tmp_dirs_ = tmp_dirs; + one_tmp_dir_per_device_ = one_dir_per_device; } -void TestEnv::InitTmpFileMgr(const vector<string>& tmp_dirs, bool one_dir_per_device) { - // Need to recreate metrics to avoid error when registering metric twice. - InitMetrics(); - tmp_file_mgr_.reset(new TmpFileMgr); - Status status = tmp_file_mgr_->InitCustom(tmp_dirs, one_dir_per_device, metrics_.get()); - CHECK(status.ok()) << status.msg().msg(); +void TestEnv::SetBufferPoolArgs(int64_t min_buffer_len, int64_t capacity) { + buffer_pool_min_buffer_len_ = min_buffer_len; + buffer_pool_capacity_ = capacity; } TestEnv::~TestEnv() { // Queries must be torn down first since they are dependent on global state. TearDownQueries(); + exec_env_->disk_io_mgr_.reset(); exec_env_.reset(); - io_mgr_tracker_.reset(); - tmp_file_mgr_.reset(); - metrics_.reset(); } void TestEnv::TearDownQueries() { @@ -93,8 +107,8 @@ int64_t TestEnv::TotalQueryMemoryConsumption() { return total; } -Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_size, - const TQueryOptions* query_options, RuntimeState** runtime_state) { +Status TestEnv::CreateQueryState( + int64_t query_id, const TQueryOptions* query_options, RuntimeState** runtime_state) { TQueryCtx query_ctx; if (query_options != nullptr) query_ctx.client_request.query_options = *query_options; query_ctx.query_id.hi = 0; @@ -110,13 +124,20 @@ Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_si new RuntimeState(qs, fis->fragment_ctx(), fis->instance_ctx(), exec_env_.get())); runtime_states_.push_back(rs); + *runtime_state = rs; + return Status::OK(); +} + +Status TestEnv::CreateQueryStateWithBlockMgr(int64_t query_id, int max_buffers, + int block_size, const TQueryOptions* query_options, RuntimeState** runtime_state) { + RETURN_IF_ERROR(CreateQueryState(query_id, query_options, runtime_state)); + shared_ptr<BufferedBlockMgr> mgr; - RETURN_IF_ERROR(BufferedBlockMgr::Create(rs, qs->query_mem_tracker(), - rs->runtime_profile(), tmp_file_mgr_.get(), + RETURN_IF_ERROR(BufferedBlockMgr::Create(*runtime_state, + (*runtime_state)->query_state()->query_mem_tracker(), + (*runtime_state)->runtime_profile(), tmp_file_mgr(), CalculateMemLimit(max_buffers, block_size), block_size, &mgr)); - rs->set_block_mgr(mgr); - - if (runtime_state != nullptr) *runtime_state = rs; + (*runtime_state)->set_block_mgr(mgr); return Status::OK(); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/test-env.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h index 3f2eaec..30e9309 100644 --- a/be/src/runtime/test-env.h +++ b/be/src/runtime/test-env.h @@ -18,7 +18,6 @@ #ifndef IMPALA_RUNTIME_TEST_ENV #define IMPALA_RUNTIME_TEST_ENV -#include "runtime/buffered-block-mgr.h" #include "runtime/disk-io-mgr.h" #include "runtime/exec-env.h" #include "runtime/fragment-instance-state.h" @@ -28,52 +27,69 @@ namespace impala { -/// Helper testing class that creates an environment with a buffered-block-mgr similar -/// to the one Impala's runtime is using. Only one TestEnv can be active at a time, -/// because it replaces the global ExecEnv singleton. +/// Helper testing class that creates an environment with runtime memory management +/// similar to the one used by the Impala runtime. Only one TestEnv can be active at a +/// time, because it modifies the global ExecEnv singleton. class TestEnv { public: TestEnv(); ~TestEnv(); - /// Reinitialize tmp_file_mgr with custom configuration. Only valid to call before - /// query states have been created. - void InitTmpFileMgr(const std::vector<std::string>& tmp_dirs, bool one_dir_per_device); + /// Set custom configuration for TmpFileMgr. Only has effect if called before Init(). + /// If not called, the default configuration is used. + void SetTmpFileMgrArgs( + const std::vector<std::string>& tmp_dirs, bool one_dir_per_device); - /// Create a QueryState and a RuntimeState for a query with a new block manager and - /// the given query options. The states are owned by the TestEnv. Returns an error if - /// CreateQueryState() has been called with the same query ID already. - /// If non-null, 'runtime_state' are set to the newly created RuntimeState. The - /// QueryState can be obtained via 'runtime_state'. - Status CreateQueryState(int64_t query_id, int max_buffers, int block_size, - const TQueryOptions* query_options, RuntimeState** runtime_state); + /// Set configuration for BufferPool. Only has effect if called before Init(). + /// If not called, a buffer pool with no capacity is created. + void SetBufferPoolArgs(int64_t min_buffer_len, int64_t capacity); + + /// Initialize the TestEnv with the specified arguments. + Status Init(); + + /// Create a QueryState and a RuntimeState for a query with the given query options. + /// The states are owned by the TestEnv. Returns an error if CreateQueryState() has + /// been called with the same query ID already. 'runtime_state' is set to the newly + /// created RuntimeState. The QueryState can be obtained via 'runtime_state'. + Status CreateQueryState( + int64_t query_id, const TQueryOptions* query_options, RuntimeState** runtime_state); + /// Same as CreateQueryState() but also creates a BufferedBlockMgr with the provided + /// parameters. If 'max_buffers' is -1, there is no limit, otherwise the limit is + /// max_buffers * block_size. + Status CreateQueryStateWithBlockMgr(int64_t query_id, int max_buffers, int block_size, + const TQueryOptions* query_options, RuntimeState** runtime_state); /// Destroy all query states and associated RuntimeStates, BufferedBlockMgrs, /// etc, that were created since the last TearDownQueries() call. void TearDownQueries(); /// Calculate memory limit accounting for overflow and negative values. /// If max_buffers is -1, no memory limit will apply. - int64_t CalculateMemLimit(int max_buffers, int block_size); + int64_t CalculateMemLimit(int max_buffers, int page_len); /// Return total of mem tracker consumption for all queries. int64_t TotalQueryMemoryConsumption(); ExecEnv* exec_env() { return exec_env_.get(); } - MemTracker* io_mgr_tracker() { return io_mgr_tracker_.get(); } - MetricGroup* metrics() { return metrics_.get(); } - TmpFileMgr* tmp_file_mgr() { return tmp_file_mgr_.get(); } + MetricGroup* metrics() { return exec_env_->metrics(); } + TmpFileMgr* tmp_file_mgr() { return exec_env_->tmp_file_mgr(); } private: /// Recreate global metric groups. void InitMetrics(); + /// Arguments for TmpFileMgr, used in Init(). + bool have_tmp_file_mgr_args_; + std::vector<std::string> tmp_dirs_; + bool one_tmp_dir_per_device_; + + /// Arguments for BufferPool, used in Init(). + int64_t buffer_pool_min_buffer_len_; + int64_t buffer_pool_capacity_; + /// Global state for test environment. static boost::scoped_ptr<MetricGroup> static_metrics_; boost::scoped_ptr<ExecEnv> exec_env_; - boost::scoped_ptr<MemTracker> io_mgr_tracker_; - boost::scoped_ptr<MetricGroup> metrics_; - boost::scoped_ptr<TmpFileMgr> tmp_file_mgr_; /// Per-query states. TestEnv holds 1 refcount per QueryState in this map. std::vector<QueryState*> query_states_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/tmp-file-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc index 59b5af4..b220fff 100644 --- a/be/src/runtime/tmp-file-mgr-test.cc +++ b/be/src/runtime/tmp-file-mgr-test.cc @@ -52,6 +52,7 @@ class TmpFileMgrTest : public ::testing::Test { metrics_.reset(new MetricGroup("tmp-file-mgr-test")); profile_ = obj_pool_.Add(new RuntimeProfile(&obj_pool_, "tmp-file-mgr-test")); test_env_.reset(new TestEnv); + ASSERT_OK(test_env_->Init()); cb_counter_ = 0; // Reset query options that are modified by tests.
