Repository: incubator-impala
Updated Branches:
  refs/heads/master 0ff1e6e8d -> 663285244


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 0d7f262..c7b916b 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -32,14 +32,16 @@
 #include "exprs/expr.h"
 #include "exprs/scalar-fn-call.h"
 #include "runtime/buffered-block-mgr.h"
-#include "runtime/exec-env.h"
-#include "runtime/descriptors.h"
+#include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/data-stream-mgr.h"
 #include "runtime/data-stream-recvr.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
 #include "runtime/runtime-filter-bank.h"
 #include "runtime/timestamp-value.h"
-#include "runtime/query-state.h"
+#include "util/auth-util.h" // for GetEffectiveUser()
 #include "util/bitmap.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
@@ -48,7 +50,6 @@
 #include "util/jni-util.h"
 #include "util/mem-info.h"
 #include "util/pretty-printer.h"
-#include "util/auth-util.h" // for GetEffectiveUser()
 
 #include "common/names.h"
 
@@ -83,6 +84,7 @@ RuntimeState::RuntimeState(QueryState* query_state, const 
TPlanFragmentCtx& frag
     exec_env_(exec_env),
     profile_(obj_pool_.get(), "Fragment " + 
PrintId(instance_ctx.fragment_instance_id)),
     query_mem_tracker_(query_state_->query_mem_tracker()),
+    instance_buffer_reservation_(nullptr),
     is_cancelled_(false),
     root_node_id_(-1) {
   Init();
@@ -100,6 +102,7 @@ RuntimeState::RuntimeState(
     profile_(obj_pool_.get(), "<unnamed>"),
     query_mem_tracker_(MemTracker::CreateQueryMemTracker(
         query_id(), query_options(), request_pool, obj_pool_.get())),
+    instance_buffer_reservation_(nullptr),
     is_cancelled_(false),
     root_node_id_(-1) {
   Init();
@@ -113,10 +116,8 @@ void RuntimeState::Init() {
   SCOPED_TIMER(profile_.total_time_counter());
 
   // Register with the thread mgr
-  if (exec_env_ != NULL) {
-    resource_pool_ = exec_env_->thread_mgr()->RegisterPool();
-    DCHECK(resource_pool_ != NULL);
-  }
+  resource_pool_ = exec_env_->thread_mgr()->RegisterPool();
+  DCHECK(resource_pool_ != NULL);
 
   total_thread_statistics_ = ADD_THREAD_COUNTERS(runtime_profile(), 
"TotalThreads");
   total_storage_wait_timer_ = ADD_TIMER(runtime_profile(), 
"TotalStorageWaitTime");
@@ -125,6 +126,13 @@ void RuntimeState::Init() {
 
   instance_mem_tracker_.reset(new MemTracker(
       runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker_));
+
+  if (query_state_ != nullptr && exec_env_->buffer_pool() != nullptr) {
+    instance_buffer_reservation_ = obj_pool_->Add(new ReservationTracker);
+    instance_buffer_reservation_->InitChildTracker(&profile_,
+        query_state_->buffer_reservation(), instance_mem_tracker_.get(),
+        numeric_limits<int64_t>::max());
+  }
 }
 
 void RuntimeState::InitFilterBank() {
@@ -291,6 +299,9 @@ void RuntimeState::ReleaseResources() {
   block_mgr_.reset(); // Release any block mgr memory, if this is the last 
reference.
   codegen_.reset(); // Release any memory associated with codegen.
 
+  // Release the reservation, which should be unused at the point.
+  if (instance_buffer_reservation_ != nullptr) 
instance_buffer_reservation_->Close();
+
   // 'query_mem_tracker_' must be valid as long as 'instance_mem_tracker_' is 
so
   // delete 'instance_mem_tracker_' first.
   // LogUsage() walks the MemTracker tree top-down when the memory limit is 
exceeded, so

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 97014aa..009fee5 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -41,6 +41,7 @@ class Expr;
 class LlvmCodeGen;
 class MemTracker;
 class ObjectPool;
+class ReservationTracker;
 class RuntimeFilterBank;
 class ScalarFnCall;
 class Status;
@@ -128,6 +129,9 @@ class RuntimeState {
   DiskIoMgr* io_mgr();
   MemTracker* instance_mem_tracker() { return instance_mem_tracker_.get(); }
   MemTracker* query_mem_tracker() { return query_mem_tracker_; }
+  ReservationTracker* instance_buffer_reservation() {
+    return instance_buffer_reservation_;
+  }
   ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; }
 
   FileMoveMap* hdfs_files_to_move() { return &hdfs_files_to_move_; }
@@ -385,6 +389,11 @@ class RuntimeState {
   /// Memory usage of this fragment instance, a child of 'query_mem_tracker_'.
   boost::scoped_ptr<MemTracker> instance_mem_tracker_;
 
+  /// Buffer reservation for this fragment instance - a child of the query 
buffer
+  /// reservation. Non-NULL if 'query_state_' is not NULL and 
ExecEnv::buffer_pool_
+  /// was created by a backend test. Owned by 'obj_pool_'.
+  ReservationTracker* instance_buffer_reservation_;
+
   /// if true, execution should stop with a CANCELLED status
   bool is_cancelled_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 1c28acd..026b2ee 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -17,7 +17,11 @@
 
 #include "runtime/test-env.h"
 
+#include <limits>
+
+#include "runtime/buffered-block-mgr.h"
 #include "runtime/query-exec-mgr.h"
+#include "runtime/tmp-file-mgr.h"
 #include "util/disk-info.h"
 #include "util/impalad-metrics.h"
 
@@ -28,46 +32,56 @@
 #include "common/names.h"
 
 using boost::scoped_ptr;
+using std::numeric_limits;
 
 namespace impala {
 
 scoped_ptr<MetricGroup> TestEnv::static_metrics_;
 
-TestEnv::TestEnv() {
+TestEnv::TestEnv()
+  : have_tmp_file_mgr_args_(false),
+    buffer_pool_min_buffer_len_(1024),
+    buffer_pool_capacity_(0) {}
+
+Status TestEnv::Init() {
   if (static_metrics_ == NULL) {
     static_metrics_.reset(new MetricGroup("test-env-static-metrics"));
     ImpaladMetrics::CreateMetrics(static_metrics_.get());
   }
+
   exec_env_.reset(new ExecEnv);
-  exec_env_->InitForFeTests();
-  io_mgr_tracker_.reset(new MemTracker(-1));
-  Status status = exec_env_->disk_io_mgr()->Init(io_mgr_tracker_.get());
-  CHECK(status.ok()) << status.msg().msg();
-  InitMetrics();
-  tmp_file_mgr_.reset(new TmpFileMgr);
-  status = tmp_file_mgr_->Init(metrics_.get());
-  CHECK(status.ok()) << status.msg().msg();
+  // Populate the ExecEnv state that the backend tests need.
+  exec_env_->mem_tracker_.reset(new MemTracker(-1, "Process"));
+  
RETURN_IF_ERROR(exec_env_->disk_io_mgr()->Init(exec_env_->process_mem_tracker()));
+  exec_env_->metrics_.reset(new MetricGroup("test-env-metrics"));
+  exec_env_->tmp_file_mgr_.reset(new TmpFileMgr);
+  if (have_tmp_file_mgr_args_) {
+    RETURN_IF_ERROR(
+        tmp_file_mgr()->InitCustom(tmp_dirs_, one_tmp_dir_per_device_, 
metrics()));
+  } else {
+    RETURN_IF_ERROR(tmp_file_mgr()->Init(metrics()));
+  }
+  exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, 
buffer_pool_capacity_);
+  return Status::OK();
 }
 
-void TestEnv::InitMetrics() {
-  metrics_.reset(new MetricGroup("test-env-metrics"));
+void TestEnv::SetTmpFileMgrArgs(
+    const std::vector<std::string>& tmp_dirs, bool one_dir_per_device) {
+  have_tmp_file_mgr_args_ = true;
+  tmp_dirs_ = tmp_dirs;
+  one_tmp_dir_per_device_ = one_dir_per_device;
 }
 
-void TestEnv::InitTmpFileMgr(const vector<string>& tmp_dirs, bool 
one_dir_per_device) {
-  // Need to recreate metrics to avoid error when registering metric twice.
-  InitMetrics();
-  tmp_file_mgr_.reset(new TmpFileMgr);
-  Status status = tmp_file_mgr_->InitCustom(tmp_dirs, one_dir_per_device, 
metrics_.get());
-  CHECK(status.ok()) << status.msg().msg();
+void TestEnv::SetBufferPoolArgs(int64_t min_buffer_len, int64_t capacity) {
+  buffer_pool_min_buffer_len_ = min_buffer_len;
+  buffer_pool_capacity_ = capacity;
 }
 
 TestEnv::~TestEnv() {
   // Queries must be torn down first since they are dependent on global state.
   TearDownQueries();
+  exec_env_->disk_io_mgr_.reset();
   exec_env_.reset();
-  io_mgr_tracker_.reset();
-  tmp_file_mgr_.reset();
-  metrics_.reset();
 }
 
 void TestEnv::TearDownQueries() {
@@ -93,8 +107,8 @@ int64_t TestEnv::TotalQueryMemoryConsumption() {
   return total;
 }
 
-Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int 
block_size,
-    const TQueryOptions* query_options, RuntimeState** runtime_state) {
+Status TestEnv::CreateQueryState(
+    int64_t query_id, const TQueryOptions* query_options, RuntimeState** 
runtime_state) {
   TQueryCtx query_ctx;
   if (query_options != nullptr) query_ctx.client_request.query_options = 
*query_options;
   query_ctx.query_id.hi = 0;
@@ -110,13 +124,20 @@ Status TestEnv::CreateQueryState(int64_t query_id, int 
max_buffers, int block_si
       new RuntimeState(qs, fis->fragment_ctx(), fis->instance_ctx(), 
exec_env_.get()));
   runtime_states_.push_back(rs);
 
+  *runtime_state = rs;
+  return Status::OK();
+}
+
+Status TestEnv::CreateQueryStateWithBlockMgr(int64_t query_id, int max_buffers,
+    int block_size, const TQueryOptions* query_options, RuntimeState** 
runtime_state) {
+  RETURN_IF_ERROR(CreateQueryState(query_id, query_options, runtime_state));
+
   shared_ptr<BufferedBlockMgr> mgr;
-  RETURN_IF_ERROR(BufferedBlockMgr::Create(rs, qs->query_mem_tracker(),
-      rs->runtime_profile(), tmp_file_mgr_.get(),
+  RETURN_IF_ERROR(BufferedBlockMgr::Create(*runtime_state,
+      (*runtime_state)->query_state()->query_mem_tracker(),
+      (*runtime_state)->runtime_profile(), tmp_file_mgr(),
       CalculateMemLimit(max_buffers, block_size), block_size, &mgr));
-  rs->set_block_mgr(mgr);
-
-  if (runtime_state != nullptr) *runtime_state = rs;
+  (*runtime_state)->set_block_mgr(mgr);
   return Status::OK();
 }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/test-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h
index 3f2eaec..30e9309 100644
--- a/be/src/runtime/test-env.h
+++ b/be/src/runtime/test-env.h
@@ -18,7 +18,6 @@
 #ifndef IMPALA_RUNTIME_TEST_ENV
 #define IMPALA_RUNTIME_TEST_ENV
 
-#include "runtime/buffered-block-mgr.h"
 #include "runtime/disk-io-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/fragment-instance-state.h"
@@ -28,52 +27,69 @@
 
 namespace impala {
 
-/// Helper testing class that creates an environment with a buffered-block-mgr 
similar
-/// to the one Impala's runtime is using. Only one TestEnv can be active at a 
time,
-/// because it replaces the global ExecEnv singleton.
+/// Helper testing class that creates an environment with runtime memory 
management
+/// similar to the one used by the Impala runtime. Only one TestEnv can be 
active at a
+/// time, because it modifies the global ExecEnv singleton.
 class TestEnv {
  public:
   TestEnv();
   ~TestEnv();
 
-  /// Reinitialize tmp_file_mgr with custom configuration. Only valid to call 
before
-  /// query states have been created.
-  void InitTmpFileMgr(const std::vector<std::string>& tmp_dirs, bool 
one_dir_per_device);
+  /// Set custom configuration for TmpFileMgr. Only has effect if called 
before Init().
+  /// If not called, the default configuration is used.
+  void SetTmpFileMgrArgs(
+      const std::vector<std::string>& tmp_dirs, bool one_dir_per_device);
 
-  /// Create a QueryState and a RuntimeState for a query with a new block 
manager and
-  /// the given query options. The states are owned by the TestEnv. Returns an 
error if
-  /// CreateQueryState() has been called with the same query ID already.
-  /// If non-null, 'runtime_state' are set to the newly created RuntimeState. 
The
-  /// QueryState can be obtained via 'runtime_state'.
-  Status CreateQueryState(int64_t query_id, int max_buffers, int block_size,
-      const TQueryOptions* query_options, RuntimeState** runtime_state);
+  /// Set configuration for BufferPool. Only has effect if called before 
Init().
+  /// If not called, a buffer pool with no capacity is created.
+  void SetBufferPoolArgs(int64_t min_buffer_len, int64_t capacity);
+
+  /// Initialize the TestEnv with the specified arguments.
+  Status Init();
+
+  /// Create a QueryState and a RuntimeState for a query with the given query 
options.
+  /// The states are owned by the TestEnv. Returns an error if 
CreateQueryState() has
+  /// been called with the same query ID already. 'runtime_state' is set to 
the newly
+  /// created RuntimeState. The QueryState can be obtained via 'runtime_state'.
+  Status CreateQueryState(
+      int64_t query_id, const TQueryOptions* query_options, RuntimeState** 
runtime_state);
 
+  /// Same as CreateQueryState() but also creates a BufferedBlockMgr with the 
provided
+  /// parameters. If 'max_buffers' is -1, there is no limit, otherwise the 
limit is
+  /// max_buffers * block_size.
+  Status CreateQueryStateWithBlockMgr(int64_t query_id, int max_buffers, int 
block_size,
+      const TQueryOptions* query_options, RuntimeState** runtime_state);
   /// Destroy all query states and associated RuntimeStates, BufferedBlockMgrs,
   /// etc, that were created since the last TearDownQueries() call.
   void TearDownQueries();
 
   /// Calculate memory limit accounting for overflow and negative values.
   /// If max_buffers is -1, no memory limit will apply.
-  int64_t CalculateMemLimit(int max_buffers, int block_size);
+  int64_t CalculateMemLimit(int max_buffers, int page_len);
 
   /// Return total of mem tracker consumption for all queries.
   int64_t TotalQueryMemoryConsumption();
 
   ExecEnv* exec_env() { return exec_env_.get(); }
-  MemTracker* io_mgr_tracker() { return io_mgr_tracker_.get(); }
-  MetricGroup* metrics() { return metrics_.get(); }
-  TmpFileMgr* tmp_file_mgr() { return tmp_file_mgr_.get(); }
+  MetricGroup* metrics() { return exec_env_->metrics(); }
+  TmpFileMgr* tmp_file_mgr() { return exec_env_->tmp_file_mgr(); }
 
  private:
   /// Recreate global metric groups.
   void InitMetrics();
 
+  /// Arguments for TmpFileMgr, used in Init().
+  bool have_tmp_file_mgr_args_;
+  std::vector<std::string> tmp_dirs_;
+  bool one_tmp_dir_per_device_;
+
+  /// Arguments for BufferPool, used in Init().
+  int64_t buffer_pool_min_buffer_len_;
+  int64_t buffer_pool_capacity_;
+
   /// Global state for test environment.
   static boost::scoped_ptr<MetricGroup> static_metrics_;
   boost::scoped_ptr<ExecEnv> exec_env_;
-  boost::scoped_ptr<MemTracker> io_mgr_tracker_;
-  boost::scoped_ptr<MetricGroup> metrics_;
-  boost::scoped_ptr<TmpFileMgr> tmp_file_mgr_;
 
   /// Per-query states. TestEnv holds 1 refcount per QueryState in this map.
   std::vector<QueryState*> query_states_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc 
b/be/src/runtime/tmp-file-mgr-test.cc
index 59b5af4..b220fff 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -52,6 +52,7 @@ class TmpFileMgrTest : public ::testing::Test {
     metrics_.reset(new MetricGroup("tmp-file-mgr-test"));
     profile_ = obj_pool_.Add(new RuntimeProfile(&obj_pool_, 
"tmp-file-mgr-test"));
     test_env_.reset(new TestEnv);
+    ASSERT_OK(test_env_->Init());
     cb_counter_ = 0;
 
     // Reset query options that are modified by tests.

Reply via email to