Repository: incubator-impala
Updated Branches:
  refs/heads/master f5c7668f9 -> 6cc296ec8


IMPALA-4104: add DCHECK to ConsumeLocal() and fix tests

The TestEnv used for the backend tests does not connect up the
MemTracker hierarchy in the expected way. This caused the valid
DCHECK in ConsumeLocal() to be triggered in backend tests.

This change fixes TestEnv to set up MemTrackers with the normal
hierarchy, as shown below, and fixes the tests to deal with the fallout
of that.

(Process)
 |
(Query)----------
 |              |
(Block Mgr) (Fragment instance)

Change-Id: Iadcbe96a9f1bf19872436211b049cebf39b0afe7
Reviewed-on: http://gerrit.cloudera.org:8080/4531
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/4849e586
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/4849e586
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/4849e586

Branch: refs/heads/master
Commit: 4849e5868e959f87db7a874a77c7f2b5711fc7b1
Parents: f5c7668
Author: Tim Armstrong <[email protected]>
Authored: Thu Sep 8 21:09:53 2016 -0700
Committer: Internal Jenkins <[email protected]>
Committed: Tue Sep 27 19:57:44 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hash-table-test.cc               |  10 +-
 be/src/runtime/buffered-block-mgr-test.cc    | 217 +++++++++++-----------
 be/src/runtime/buffered-block-mgr.h          |   1 +
 be/src/runtime/buffered-tuple-stream-test.cc |   6 +-
 be/src/runtime/mem-tracker.h                 |   1 +
 be/src/runtime/test-env.cc                   |  14 +-
 be/src/runtime/test-env.h                    |   5 +-
 7 files changed, 135 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4849e586/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index 1066d2c..d903420 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -184,11 +184,13 @@ class HashTableTest : public testing::Test {
   bool CreateHashTable(bool quadratic, int64_t initial_num_buckets,
       scoped_ptr<HashTable>* table, int block_size = 8 * 1024 * 1024,
       int max_num_blocks = 100, int reserved_blocks = 10) {
-    EXPECT_TRUE(test_env_->CreateQueryState(0, max_num_blocks, block_size,
-        &runtime_state_).ok());
+    EXPECT_OK(
+        test_env_->CreateQueryState(0, max_num_blocks, block_size, 
&runtime_state_));
+    MemTracker* client_tracker = pool_.Add(
+        new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
     BufferedBlockMgr::Client* client;
-    EXPECT_TRUE(runtime_state_->block_mgr()->RegisterClient("", 
reserved_blocks, false,
-        &tracker_, runtime_state_, &client).ok());
+    EXPECT_OK(runtime_state_->block_mgr()->RegisterClient(
+        "", reserved_blocks, false, client_tracker, runtime_state_, &client));
 
     // Initial_num_buckets must be a power of two.
     EXPECT_EQ(initial_num_buckets, 
BitUtil::RoundUpToPowerOfTwo(initial_num_buckets));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4849e586/be/src/runtime/buffered-block-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr-test.cc 
b/be/src/runtime/buffered-block-mgr-test.cc
index 5eb1f8c..ae822bb 100644
--- a/be/src/runtime/buffered-block-mgr-test.cc
+++ b/be/src/runtime/buffered-block-mgr-test.cc
@@ -15,27 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <boost/scoped_ptr.hpp>
-#include <boost/bind.hpp>
-#include <boost/thread/thread.hpp>
-#include <boost/filesystem.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
 #include <gutil/strings/substitute.h>
 #include <sys/stat.h>
+#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/filesystem.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/thread.hpp>
 
-#include "testutil/gtest-util.h"
-#include "common/init.h"
 #include "codegen/llvm-codegen.h"
-#include "runtime/disk-io-mgr.h"
+#include "common/init.h"
+#include "common/object-pool.h"
 #include "runtime/buffered-block-mgr.h"
+#include "runtime/disk-io-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/runtime-state.h"
 #include "runtime/test-env.h"
 #include "runtime/tmp-file-mgr.h"
 #include "service/fe-support.h"
-#include "util/disk-info.h"
+#include "testutil/gtest-util.h"
 #include "util/cpu-info.h"
+#include "util/disk-info.h"
 #include "util/filesystem-util.h"
 #include "util/promise.h"
 #include "util/test-info.h"
@@ -73,13 +74,11 @@ class BufferedBlockMgrTest : public ::testing::Test {
 
   virtual void SetUp() {
     test_env_.reset(new TestEnv());
-    client_tracker_.reset(new MemTracker(-1));
   }
 
   virtual void TearDown() {
     TearDownMgrs();
     test_env_.reset();
-    client_tracker_.reset();
 
     // Tests modify permissions, so make sure we can delete if they didn't 
clean up.
     for (int i = 0; i < created_tmp_dirs_.size(); ++i) {
@@ -87,6 +86,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
     }
     FileSystemUtil::RemovePaths(created_tmp_dirs_);
     created_tmp_dirs_.clear();
+    pool_.Clear();
   }
 
   /// Reinitialize test_env_ to have multiple temporary directories.
@@ -106,8 +106,8 @@ class BufferedBlockMgrTest : public ::testing::Test {
   }
 
   static void ValidateBlock(BufferedBlockMgr::Block* block, int32_t data) {
-    EXPECT_TRUE(block->valid_data_len() == sizeof(int32_t));
-    EXPECT_TRUE(*reinterpret_cast<int32_t*>(block->buffer()) == data);
+    EXPECT_EQ(block->valid_data_len(), sizeof(int32_t));
+    EXPECT_EQ(*reinterpret_cast<int32_t*>(block->buffer()), data);
   }
 
   static int32_t* MakeRandomSizeData(BufferedBlockMgr::Block* block) {
@@ -149,15 +149,22 @@ class BufferedBlockMgrTest : public ::testing::Test {
     return state->block_mgr();
   }
 
+  /// Create a new client tracker as a child of the RuntimeState's instance 
tracker.
+  MemTracker* NewClientTracker(RuntimeState* state) {
+    return pool_.Add(new MemTracker(-1, "client", 
state->instance_mem_tracker()));
+  }
+
   BufferedBlockMgr* CreateMgrAndClient(int64_t query_id, int max_buffers, int 
block_size,
-      int reserved_blocks, bool tolerates_oversubscription, MemTracker* 
tracker,
+      int reserved_blocks, bool tolerates_oversubscription,
       BufferedBlockMgr::Client** client, RuntimeState** query_state = NULL,
       TQueryOptions* query_options = NULL) {
     RuntimeState* state;
     BufferedBlockMgr* mgr = CreateMgr(query_id, max_buffers, block_size, 
&state,
         query_options);
-    EXPECT_TRUE(mgr->RegisterClient(Substitute("Client for query $0", 
query_id),
-        reserved_blocks, tolerates_oversubscription, tracker, state, 
client).ok());
+
+    MemTracker* client_tracker = NewClientTracker(state);
+    EXPECT_OK(mgr->RegisterClient(Substitute("Client for query $0", query_id),
+        reserved_blocks, tolerates_oversubscription, client_tracker, state, 
client));
     EXPECT_TRUE(client != NULL);
     if (query_state != NULL) *query_state = state;
     return mgr;
@@ -165,13 +172,11 @@ class BufferedBlockMgrTest : public ::testing::Test {
 
   void CreateMgrsAndClients(int64_t start_query_id, int num_mgrs, int 
buffers_per_mgr,
       int block_size, int reserved_blocks_per_client, bool 
tolerates_oversubscription,
-      MemTracker* tracker, vector<BufferedBlockMgr*>* mgrs,
-      vector<BufferedBlockMgr::Client*>* clients) {
+      vector<BufferedBlockMgr*>* mgrs, vector<BufferedBlockMgr::Client*>* 
clients) {
     for (int i = 0; i < num_mgrs; ++i) {
       BufferedBlockMgr::Client* client;
       BufferedBlockMgr* mgr = CreateMgrAndClient(start_query_id + i, 
buffers_per_mgr,
-          block_size_, reserved_blocks_per_client, tolerates_oversubscription,
-          tracker, &client);
+          block_size_, reserved_blocks_per_client, tolerates_oversubscription, 
&client);
       mgrs->push_back(mgr);
       clients->push_back(client);
     }
@@ -179,9 +184,9 @@ class BufferedBlockMgrTest : public ::testing::Test {
 
   // Destroy all created query states and associated block managers.
   void TearDownMgrs() {
-    // Freeing all block managers should clean up all consumed memory.
+    // Tear down the query states, which DCHECKs that the memory consumption of
+    // the query's trackers is zero.
     test_env_->TearDownQueryStates();
-    EXPECT_EQ(test_env_->block_mgr_parent_tracker()->consumption(), 0);
   }
 
   void AllocateBlocks(BufferedBlockMgr* block_mgr, BufferedBlockMgr::Client* 
client,
@@ -292,9 +297,8 @@ class BufferedBlockMgrTest : public ::testing::Test {
     vector<BufferedBlockMgr::Block*> blocks;
     BufferedBlockMgr* block_mgr;
     BufferedBlockMgr::Client* client;
-    block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, false,
-        client_tracker_.get(), &client);
-    EXPECT_EQ(test_env_->block_mgr_parent_tracker()->consumption(), 0);
+    block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, false, 
&client);
+    EXPECT_EQ(test_env_->TotalQueryMemoryConsumption(), 0);
 
     // Allocate blocks until max_num_blocks, they should all succeed and memory
     // usage should go up.
@@ -317,7 +321,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
     uint8_t* old_buffer = first_block->buffer();
     status = block_mgr->GetNewBlock(client, first_block, &new_block);
     EXPECT_TRUE(new_block != NULL);
-    EXPECT_TRUE(old_buffer == new_block->buffer());
+    EXPECT_EQ(old_buffer, new_block->buffer());
     EXPECT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size);
     EXPECT_TRUE(!first_block->is_pinned());
     blocks.push_back(new_block);
@@ -338,8 +342,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
     int max_num_buffers = 5;
     BufferedBlockMgr* block_mgr;
     BufferedBlockMgr::Client* client;
-    block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false,
-        client_tracker_.get(), &client);
+    block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false, 
&client);
 
     // Check counters.
     RuntimeProfile* profile = block_mgr->profile();
@@ -398,8 +401,8 @@ class BufferedBlockMgrTest : public ::testing::Test {
     ApiFunction api_function;
 
     BufferedBlockMgr::Client* client;
-    EXPECT_OK(block_mgr->RegisterClient("", 0, false, client_tracker_.get(), 
state,
-        &client));
+    EXPECT_OK(
+        block_mgr->RegisterClient("", 0, false, NewClientTracker(state), 
state, &client));
     EXPECT_TRUE(client != NULL);
 
     pinned_blocks.reserve(num_buffers);
@@ -543,9 +546,9 @@ class BufferedBlockMgrTest : public ::testing::Test {
     const int iters = 10000;
     for (int i = 0; i < iters; ++i) {
       shared_ptr<BufferedBlockMgr> mgr;
-      Status status = BufferedBlockMgr::Create(state,
-          test_env_->block_mgr_parent_tracker(), state->runtime_profile(),
-          test_env_->tmp_file_mgr(), block_size_ * num_buffers, block_size_, 
&mgr);
+      Status status = BufferedBlockMgr::Create(state, 
state->query_mem_tracker(),
+          state->runtime_profile(), test_env_->tmp_file_mgr(), block_size_ * 
num_buffers,
+          block_size_, &mgr);
     }
   }
 
@@ -557,9 +560,11 @@ class BufferedBlockMgrTest : public ::testing::Test {
     // Create a shared RuntimeState with no BufferedBlockMgr.
     RuntimeState* shared_state =
         new RuntimeState(TExecPlanFragmentParams(), test_env_->exec_env());
+    shared_state->InitMemTrackers(TUniqueId(), NULL, -1);
+
     for (int i = 0; i < num_threads; ++i) {
-      thread* t = new thread(bind(
-          &BufferedBlockMgrTest::CreateDestroyThread, this, shared_state));
+      thread* t = new thread(
+          bind(&BufferedBlockMgrTest::CreateDestroyThread, this, 
shared_state));
       workers.add_thread(t);
     }
     workers.join_all();
@@ -578,7 +583,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
   void TestRuntimeStateTeardown(bool write_error, bool wait_for_writes);
 
   scoped_ptr<TestEnv> test_env_;
-  scoped_ptr<MemTracker> client_tracker_;
+  ObjectPool pool_;
   vector<string> created_tmp_dirs_;
 };
 
@@ -593,9 +598,9 @@ TEST_F(BufferedBlockMgrTest, GetNewBlockSmallBlocks) {
   int max_num_blocks = 3;
   BufferedBlockMgr* block_mgr;
   BufferedBlockMgr::Client* client;
-  block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, false,
-      client_tracker_.get(), &client);
-  EXPECT_EQ(0, test_env_->block_mgr_parent_tracker()->consumption());
+  block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, false, 
&client);
+  MemTracker* client_tracker = block_mgr->get_tracker(client);
+  EXPECT_EQ(0, test_env_->TotalQueryMemoryConsumption());
 
   vector<BufferedBlockMgr::Block*> blocks;
 
@@ -604,8 +609,8 @@ TEST_F(BufferedBlockMgrTest, GetNewBlockSmallBlocks) {
   EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block, 128));
   EXPECT_TRUE(new_block != NULL);
   EXPECT_EQ(block_mgr->bytes_allocated(), 0);
-  EXPECT_EQ(test_env_->block_mgr_parent_tracker()->consumption(), 0);
-  EXPECT_EQ(client_tracker_->consumption(), 128);
+  EXPECT_EQ(block_mgr->mem_tracker()->consumption(), 0);
+  EXPECT_EQ(client_tracker->consumption(), 128);
   EXPECT_TRUE(new_block->is_pinned());
   EXPECT_EQ(new_block->BytesRemaining(), 128);
   EXPECT_TRUE(new_block->buffer() != NULL);
@@ -615,9 +620,8 @@ TEST_F(BufferedBlockMgrTest, GetNewBlockSmallBlocks) {
   EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block));
   EXPECT_TRUE(new_block != NULL);
   EXPECT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size());
-  EXPECT_EQ(test_env_->block_mgr_parent_tracker()->consumption(),
-            block_mgr->max_block_size());
-  EXPECT_EQ(client_tracker_->consumption(), 128 + block_mgr->max_block_size());
+  EXPECT_EQ(block_mgr->mem_tracker()->consumption(), 
block_mgr->max_block_size());
+  EXPECT_EQ(client_tracker->consumption(), 128 + block_mgr->max_block_size());
   EXPECT_TRUE(new_block->is_pinned());
   EXPECT_EQ(new_block->BytesRemaining(), block_mgr->max_block_size());
   EXPECT_TRUE(new_block->buffer() != NULL);
@@ -627,9 +631,8 @@ TEST_F(BufferedBlockMgrTest, GetNewBlockSmallBlocks) {
   EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block, 512));
   EXPECT_TRUE(new_block != NULL);
   EXPECT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size());
-  EXPECT_EQ(test_env_->block_mgr_parent_tracker()->consumption(),
-            block_mgr->max_block_size());
-  EXPECT_EQ(client_tracker_->consumption(), 128 + 512 + 
block_mgr->max_block_size());
+  EXPECT_EQ(block_mgr->mem_tracker()->consumption(), 
block_mgr->max_block_size());
+  EXPECT_EQ(client_tracker->consumption(), 128 + 512 + 
block_mgr->max_block_size());
   EXPECT_TRUE(new_block->is_pinned());
   EXPECT_EQ(new_block->BytesRemaining(), 512);
   EXPECT_TRUE(new_block->buffer() != NULL);
@@ -652,8 +655,7 @@ TEST_F(BufferedBlockMgrTest, Pin) {
   const int block_size = 1024;
   BufferedBlockMgr* block_mgr;
   BufferedBlockMgr::Client* client;
-  block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, false,
-      client_tracker_.get(), &client);
+  block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, false, 
&client);
 
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_blocks, &blocks);
@@ -703,8 +705,7 @@ TEST_F(BufferedBlockMgrTest, Deletion) {
   const int block_size = 1024;
   BufferedBlockMgr* block_mgr;
   BufferedBlockMgr::Client* client;
-  block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false,
-      client_tracker_.get(), &client);
+  block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false, 
&client);
 
   // Check counters.
   RuntimeProfile* profile = block_mgr->profile();
@@ -713,13 +714,13 @@ TEST_F(BufferedBlockMgrTest, Deletion) {
 
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
-  EXPECT_TRUE(created_cnt->value() == max_num_buffers);
+  EXPECT_EQ(created_cnt->value(), max_num_buffers);
 
   DeleteBlocks(blocks);
   blocks.clear();
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
-  EXPECT_TRUE(created_cnt->value() == max_num_buffers);
-  EXPECT_TRUE(recycled_cnt->value() == max_num_buffers);
+  EXPECT_EQ(created_cnt->value(), max_num_buffers);
+  EXPECT_EQ(recycled_cnt->value(), max_num_buffers);
 
   DeleteBlocks(blocks);
   TearDownMgrs();
@@ -730,8 +731,9 @@ TEST_F(BufferedBlockMgrTest, Deletion) {
 TEST_F(BufferedBlockMgrTest, DeleteSingleBlocks) {
   int max_num_buffers = 16;
   BufferedBlockMgr::Client* client;
-  BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, 
block_size_,
-      0, false, client_tracker_.get(), &client);
+  BufferedBlockMgr* block_mgr =
+      CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client);
+  MemTracker* client_tracker = block_mgr->get_tracker(client);
 
   // Pinned I/O block.
   BufferedBlockMgr::Block* new_block;
@@ -740,16 +742,16 @@ TEST_F(BufferedBlockMgrTest, DeleteSingleBlocks) {
   EXPECT_TRUE(new_block->is_pinned());
   EXPECT_TRUE(new_block->is_max_size());
   new_block->Delete();
-  EXPECT_TRUE(client_tracker_->consumption() == 0);
+  EXPECT_EQ(0, client_tracker->consumption());
 
   // Pinned non-I/O block.
   int small_block_size = 128;
   EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block, 
small_block_size));
   EXPECT_TRUE(new_block != NULL);
   EXPECT_TRUE(new_block->is_pinned());
-  EXPECT_EQ(small_block_size, client_tracker_->consumption());
+  EXPECT_EQ(small_block_size, client_tracker->consumption());
   new_block->Delete();
-  EXPECT_EQ(0, client_tracker_->consumption());
+  EXPECT_EQ(0, client_tracker->consumption());
 
   // Unpinned I/O block - delete after written to disk.
   EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block));
@@ -760,7 +762,7 @@ TEST_F(BufferedBlockMgrTest, DeleteSingleBlocks) {
   EXPECT_FALSE(new_block->is_pinned());
   WaitForWrites(block_mgr);
   new_block->Delete();
-  EXPECT_TRUE(client_tracker_->consumption() == 0);
+  EXPECT_EQ(client_tracker->consumption(), 0);
 
   // Unpinned I/O block - delete before written to disk.
   EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block));
@@ -771,7 +773,7 @@ TEST_F(BufferedBlockMgrTest, DeleteSingleBlocks) {
   EXPECT_FALSE(new_block->is_pinned());
   new_block->Delete();
   WaitForWrites(block_mgr);
-  EXPECT_TRUE(client_tracker_->consumption() == 0);
+  EXPECT_EQ(client_tracker->consumption(), 0);
 
   TearDownMgrs();
 }
@@ -792,8 +794,8 @@ TEST_F(BufferedBlockMgrTest, TransferBufferDuringWrite) {
   const int max_num_buffers = 2;
   BufferedBlockMgr::Client* client;
   RuntimeState* query_state;
-  BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, 
block_size_,
-      1, false, client_tracker_.get(), &client, &query_state);
+  BufferedBlockMgr* block_mgr = CreateMgrAndClient(
+      0, max_num_buffers, block_size_, 1, false, &client, &query_state);
 
   for (int trial = 0; trial < trials; ++trial) {
     for (int delay_ms = 0; delay_ms <= 10; delay_ms += 5) {
@@ -811,8 +813,8 @@ TEST_F(BufferedBlockMgrTest, TransferBufferDuringWrite) {
       // number of buffers.
       int reserved_buffers = trial % max_num_buffers;
       BufferedBlockMgr::Client* tmp_client;
-      EXPECT_TRUE(block_mgr->RegisterClient("tmp_client", reserved_buffers, 
false,
-          client_tracker_.get(), query_state, &tmp_client).ok());
+      EXPECT_OK(block_mgr->RegisterClient("tmp_client", reserved_buffers, 
false,
+          NewClientTracker(query_state), query_state, &tmp_client));
       BufferedBlockMgr::Block* tmp_block;
       ASSERT_OK(block_mgr->GetNewBlock(tmp_client, NULL, &tmp_block));
 
@@ -838,8 +840,7 @@ TEST_F(BufferedBlockMgrTest, Close) {
   const int block_size = 1024;
   BufferedBlockMgr* block_mgr;
   BufferedBlockMgr::Client* client;
-  block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false,
-      client_tracker_.get(), &client);
+  block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false, 
&client);
 
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
@@ -866,8 +867,8 @@ TEST_F(BufferedBlockMgrTest, DestructDuringWrite) {
 
   for (int trial = 0; trial < trials; ++trial) {
     BufferedBlockMgr::Client* client;
-    BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, 
block_size_,
-        0, false, client_tracker_.get(), &client);
+    BufferedBlockMgr* block_mgr =
+        CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client);
 
     vector<BufferedBlockMgr::Block*> blocks;
     AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
@@ -884,20 +885,23 @@ TEST_F(BufferedBlockMgrTest, DestructDuringWrite) {
   // Destroying test environment will check that all writes have completed.
 }
 
-void BufferedBlockMgrTest::TestRuntimeStateTeardown(bool write_error,
-    bool wait_for_writes) {
+void BufferedBlockMgrTest::TestRuntimeStateTeardown(
+    bool write_error, bool wait_for_writes) {
   const int max_num_buffers = 10;
   RuntimeState* state;
   BufferedBlockMgr::Client* client;
-  CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, 
client_tracker_.get(),
-      &client, &state);
+  CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client, 
&state);
 
-  // Hold another reference to block mgr so that it can outlive runtime state.
+  // Hold extra references to block mgr and query mem tracker so they can 
outlive runtime
+  // state.
   shared_ptr<BufferedBlockMgr> block_mgr;
-  Status status = BufferedBlockMgr::Create(state, 
test_env_->block_mgr_parent_tracker(),
+  shared_ptr<MemTracker> query_mem_tracker;
+  Status status = BufferedBlockMgr::Create(state, state->query_mem_tracker(),
       state->runtime_profile(), test_env_->tmp_file_mgr(), 0, block_size_, 
&block_mgr);
   ASSERT_TRUE(status.ok());
   ASSERT_TRUE(block_mgr != NULL);
+  query_mem_tracker = MemTracker::GetQueryMemTracker(
+      state->query_id(), -1, test_env_->exec_env()->process_mem_tracker());
 
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr.get(), client, max_num_buffers, &blocks);
@@ -927,7 +931,7 @@ void BufferedBlockMgrTest::TestRuntimeStateTeardown(bool 
write_error,
   if (wait_for_writes) WaitForWrites(block_mgr.get());
   block_mgr.reset();
 
-  EXPECT_TRUE(test_env_->block_mgr_parent_tracker()->consumption() == 0);
+  EXPECT_EQ(test_env_->TotalQueryMemoryConsumption(), 0);
 }
 
 TEST_F(BufferedBlockMgrTest, RuntimeStateTeardown) {
@@ -947,8 +951,8 @@ TEST_F(BufferedBlockMgrTest, 
WriteCompleteWithCancelledRuntimeState) {
   const int max_num_buffers = 10;
   RuntimeState* state;
   BufferedBlockMgr::Client* client;
-  BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, 
block_size_, 0,
-      false, client_tracker_.get(), &client, &state);
+  BufferedBlockMgr* block_mgr =
+      CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client, 
&state);
 
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
@@ -984,8 +988,7 @@ TEST_F(BufferedBlockMgrTest, WriteError) {
   const int block_size = 1024;
   BufferedBlockMgr* block_mgr;
   BufferedBlockMgr::Client* client;
-  block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false,
-      client_tracker_.get(), &client);
+  block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false, 
&client);
 
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
@@ -1013,8 +1016,8 @@ TEST_F(BufferedBlockMgrTest, WriteError) {
 TEST_F(BufferedBlockMgrTest, TmpFileAllocateError) {
   int max_num_buffers = 2;
   BufferedBlockMgr::Client* client;
-  BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, 
block_size_, 0,
-      false, client_tracker_.get(), &client);
+  BufferedBlockMgr* block_mgr =
+      CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client);
 
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
@@ -1047,8 +1050,8 @@ TEST_F(BufferedBlockMgrTest, 
DISABLED_WriteErrorBlacklist) {
   int blocks_per_mgr = MAX_NUM_BLOCKS / NUM_BLOCK_MGRS;
   vector<BufferedBlockMgr*> block_mgrs;
   vector<BufferedBlockMgr::Client*> clients;
-  CreateMgrsAndClients(0, NUM_BLOCK_MGRS, blocks_per_mgr, block_size_, 0, 
false,
-      client_tracker_.get(), &block_mgrs, &clients);
+  CreateMgrsAndClients(
+      0, NUM_BLOCK_MGRS, blocks_per_mgr, block_size_, 0, false, &block_mgrs, 
&clients);
 
   // Allocate files for all 2x2 combinations by unpinning blocks.
   vector<vector<BufferedBlockMgr::Block*>> blocks;
@@ -1106,8 +1109,8 @@ TEST_F(BufferedBlockMgrTest, 
DISABLED_WriteErrorBlacklist) {
   }
   // A new block manager should only use the good dir for backing storage.
   BufferedBlockMgr::Client* new_client;
-  BufferedBlockMgr* new_block_mgr = CreateMgrAndClient(9999, blocks_per_mgr, 
block_size_,
-      0, false, client_tracker_.get(), &new_client);
+  BufferedBlockMgr* new_block_mgr =
+      CreateMgrAndClient(9999, blocks_per_mgr, block_size_, 0, false, 
&new_client);
   vector<BufferedBlockMgr::Block*> new_mgr_blocks;
   AllocateBlocks(new_block_mgr, new_client, blocks_per_mgr, &new_mgr_blocks);
   UnpinBlocks(new_mgr_blocks);
@@ -1130,8 +1133,8 @@ TEST_F(BufferedBlockMgrTest, AllocationErrorHandling) {
   vector<RuntimeState*> runtime_states;
   vector<BufferedBlockMgr*> block_mgrs;
   vector<BufferedBlockMgr::Client*> clients;
-  CreateMgrsAndClients(0, num_block_mgrs, blocks_per_mgr, block_size_, 0,
-      false, client_tracker_.get(), &block_mgrs, &clients);
+  CreateMgrsAndClients(
+      0, num_block_mgrs, blocks_per_mgr, block_size_, 0, false, &block_mgrs, 
&clients);
 
   // Allocate files for all 2x2 combinations by unpinning blocks.
   vector<vector<BufferedBlockMgr::Block*>> blocks;
@@ -1164,8 +1167,8 @@ TEST_F(BufferedBlockMgrTest, NoDirsAllocationError) {
   vector<string> tmp_dirs = InitMultipleTmpDirs(2);
   int max_num_buffers = 2;
   BufferedBlockMgr::Client* client;
-  BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, 
block_size_,
-      0, false, client_tracker_.get(), &client);
+  BufferedBlockMgr* block_mgr =
+      CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client);
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
   for (int i = 0; i < tmp_dirs.size(); ++i) {
@@ -1183,8 +1186,8 @@ TEST_F(BufferedBlockMgrTest, NoTmpDirs) {
   InitMultipleTmpDirs(0);
   int max_num_buffers = 3;
   BufferedBlockMgr::Client* client;
-  BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, 
block_size_,
-      0, false, client_tracker_.get(), &client);
+  BufferedBlockMgr* block_mgr =
+      CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client);
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
   DeleteBlocks(blocks);
@@ -1198,7 +1201,7 @@ TEST_F(BufferedBlockMgrTest, ScratchLimitZero) {
   TQueryOptions query_options;
   query_options.scratch_limit = 0;
   BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, 
block_size_,
-      0, false, client_tracker_.get(), &client, NULL, &query_options);
+      0, false, &client, NULL, &query_options);
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
   DeleteBlocks(blocks);
@@ -1215,11 +1218,11 @@ TEST_F(BufferedBlockMgrTest, MultipleClients) {
 
   BufferedBlockMgr::Client* client1 = NULL;
   BufferedBlockMgr::Client* client2 = NULL;
-  EXPECT_OK(block_mgr->RegisterClient("", client1_buffers, false, 
client_tracker_.get(),
-      runtime_state, &client1));
+  EXPECT_OK(block_mgr->RegisterClient("", client1_buffers, false,
+      NewClientTracker(runtime_state), runtime_state, &client1));
   EXPECT_TRUE(client1 != NULL);
-  EXPECT_OK(block_mgr->RegisterClient("", client2_buffers, false, 
client_tracker_.get(),
-      runtime_state, &client2));
+  EXPECT_OK(block_mgr->RegisterClient("", client2_buffers, false,
+      NewClientTracker(runtime_state), runtime_state, &client2));
   EXPECT_TRUE(client2 != NULL);
 
   // Reserve client 1's and 2's buffers. They should succeed.
@@ -1317,11 +1320,11 @@ TEST_F(BufferedBlockMgrTest, 
MultipleClientsExtraBuffers) {
   BufferedBlockMgr::Client* client1 = NULL;
   BufferedBlockMgr::Client* client2 = NULL;
   BufferedBlockMgr::Block* block = NULL;
-  EXPECT_OK(block_mgr->RegisterClient("", client1_buffers, false, 
client_tracker_.get(),
-      runtime_state, &client1));
+  EXPECT_OK(block_mgr->RegisterClient("", client1_buffers, false,
+      NewClientTracker(runtime_state), runtime_state, &client1));
   EXPECT_TRUE(client1 != NULL);
-  EXPECT_OK(block_mgr->RegisterClient("", client2_buffers, false, 
client_tracker_.get(),
-      runtime_state, &client2));
+  EXPECT_OK(block_mgr->RegisterClient("", client2_buffers, false,
+      NewClientTracker(runtime_state), runtime_state, &client2));
   EXPECT_TRUE(client2 != NULL);
 
   vector<BufferedBlockMgr::Block*> client1_blocks;
@@ -1367,14 +1370,14 @@ TEST_F(BufferedBlockMgrTest, ClientOversubscription) {
   BufferedBlockMgr::Client* client2 = NULL;
   BufferedBlockMgr::Client* client3 = NULL;
   BufferedBlockMgr::Block* block = NULL;
-  EXPECT_OK(block_mgr->RegisterClient("", client1_buffers, false, 
client_tracker_.get(),
-      runtime_state, &client1));
+  EXPECT_OK(block_mgr->RegisterClient("", client1_buffers, false,
+      NewClientTracker(runtime_state), runtime_state, &client1));
   EXPECT_TRUE(client1 != NULL);
-  EXPECT_OK(block_mgr->RegisterClient("", client2_buffers, false, 
client_tracker_.get(),
-      runtime_state, &client2));
+  EXPECT_OK(block_mgr->RegisterClient("", client2_buffers, false,
+      NewClientTracker(runtime_state), runtime_state, &client2));
   EXPECT_TRUE(client2 != NULL);
-  EXPECT_OK(block_mgr->RegisterClient("", client3_buffers, true, 
client_tracker_.get(),
-      runtime_state, &client3));
+  EXPECT_OK(block_mgr->RegisterClient("", client3_buffers, true,
+      NewClientTracker(runtime_state), runtime_state, &client3));
   EXPECT_TRUE(client3 != NULL);
 
   // Client one allocates first block, should work.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4849e586/be/src/runtime/buffered-block-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.h 
b/be/src/runtime/buffered-block-mgr.h
index ad8ad85..b715ec5 100644
--- a/be/src/runtime/buffered-block-mgr.h
+++ b/be/src/runtime/buffered-block-mgr.h
@@ -393,6 +393,7 @@ class BufferedBlockMgr {
 
   int num_pinned_buffers(Client* client) const;
   int num_reserved_buffers_remaining(Client* client) const;
+  MemTracker* mem_tracker() const { return mem_tracker_.get(); };
   MemTracker* get_tracker(Client* client) const;
   int64_t max_block_size() const { return max_block_size_; }
   int64_t bytes_allocated() const;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4849e586/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc 
b/be/src/runtime/buffered-tuple-stream-test.cc
index 5049e8b..8d07584 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -102,8 +102,10 @@ class SimpleTupleStreamTest : public testing::Test {
   /// tracked by tracker_.
   void InitBlockMgr(int64_t limit, int block_size) {
     ASSERT_OK(test_env_->CreateQueryState(0, limit, block_size, 
&runtime_state_));
-    ASSERT_OK(runtime_state_->block_mgr()->RegisterClient("", 0, false, 
&tracker_,
-        runtime_state_, &client_));
+    MemTracker* client_tracker = pool_.Add(
+        new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
+    ASSERT_OK(runtime_state_->block_mgr()->RegisterClient(
+        "", 0, false, client_tracker, runtime_state_, &client_));
   }
 
   /// Generate the ith element of a sequence of int values.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4849e586/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index a2c3e9b..17b8ba3 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -140,6 +140,7 @@ class MemTracker {
       DCHECK(!all_trackers_[i]->has_limit());
       all_trackers_[i]->consumption_->Add(bytes);
     }
+    DCHECK(false) << "end_tracker is not an ancestor";
   }
 
   void ReleaseLocal(int64_t bytes, MemTracker* end_tracker) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4849e586/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index c5a9a41..f0caab7 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -37,7 +37,6 @@ TestEnv::TestEnv() {
   exec_env_.reset(new ExecEnv);
   exec_env_->InitForFeTests();
   io_mgr_tracker_.reset(new MemTracker(-1));
-  block_mgr_parent_tracker_.reset(new MemTracker(-1));
   exec_env_->disk_io_mgr()->Init(io_mgr_tracker_.get());
   InitMetrics();
   tmp_file_mgr_.reset(new TmpFileMgr);
@@ -59,7 +58,6 @@ void TestEnv::InitTmpFileMgr(const std::vector<std::string>& 
tmp_dirs,
 TestEnv::~TestEnv() {
   // Queries must be torn down first since they are dependent on global state.
   TearDownQueryStates();
-  block_mgr_parent_tracker_.reset();
   exec_env_.reset();
   io_mgr_tracker_.reset();
   tmp_file_mgr_.reset();
@@ -82,12 +80,13 @@ Status TestEnv::CreateQueryState(int64_t query_id, int 
max_buffers, int block_si
     return Status("Unexpected error creating RuntimeState");
   }
 
+  (*runtime_state)->InitMemTrackers(TUniqueId(), NULL, -1);
+
   shared_ptr<BufferedBlockMgr> mgr;
   RETURN_IF_ERROR(BufferedBlockMgr::Create(*runtime_state,
-      block_mgr_parent_tracker_.get(), (*runtime_state)->runtime_profile(),
+      (*runtime_state)->query_mem_tracker(), 
(*runtime_state)->runtime_profile(),
       tmp_file_mgr_.get(), CalculateMemLimit(max_buffers, block_size), 
block_size, &mgr));
   (*runtime_state)->set_block_mgr(mgr);
-  (*runtime_state)->InitMemTrackers(TUniqueId(), NULL, -1);
 
   query_states_.push_back(shared_ptr<RuntimeState>(*runtime_state));
   return Status::OK();
@@ -116,4 +115,11 @@ int64_t TestEnv::CalculateMemLimit(int max_buffers, int 
block_size) {
   return max_buffers * static_cast<int64_t>(block_size);
 }
 
+int64_t TestEnv::TotalQueryMemoryConsumption() {
+  int64_t total = 0;
+  for (shared_ptr<RuntimeState>& query_state : query_states_) {
+    total += query_state->query_mem_tracker()->consumption();
+  }
+  return total;
+}
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4849e586/be/src/runtime/test-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h
index a3ab29a..d30424d 100644
--- a/be/src/runtime/test-env.h
+++ b/be/src/runtime/test-env.h
@@ -54,8 +54,10 @@ class TestEnv {
   /// If max_buffers is -1, no memory limit will apply.
   int64_t CalculateMemLimit(int max_buffers, int block_size);
 
+  /// Return total of mem tracker consumption for all queries.
+  int64_t TotalQueryMemoryConsumption();
+
   ExecEnv* exec_env() { return exec_env_.get(); }
-  MemTracker* block_mgr_parent_tracker() { return 
block_mgr_parent_tracker_.get(); }
   MemTracker* io_mgr_tracker() { return io_mgr_tracker_.get(); }
   MetricGroup* metrics() { return metrics_.get(); }
   TmpFileMgr* tmp_file_mgr() { return tmp_file_mgr_.get(); }
@@ -72,7 +74,6 @@ class TestEnv {
   /// Global state for test environment.
   static boost::scoped_ptr<MetricGroup> static_metrics_;
   boost::scoped_ptr<ExecEnv> exec_env_;
-  boost::scoped_ptr<MemTracker> block_mgr_parent_tracker_;
   boost::scoped_ptr<MemTracker> io_mgr_tracker_;
   boost::scoped_ptr<MetricGroup> metrics_;
   boost::scoped_ptr<TmpFileMgr> tmp_file_mgr_;

Reply via email to