IMPALA-4831: enforce BufferPool reservation invariants

Before this patch ill-behaved code outside BufferPool could
violate BufferPool invariants by calling methods on ReservationTracker()
such as DecreaseReservation() or ReleaseFrom() or by hooking
up Clients and ReservationTrackers in the wrong way (e.g. sharing
a ReservationTracker between two Clients).

Now each client creates and owns its ReservationTracker and restricts
which methods can be called from outside BufferPool. This also reduces
the amount of boilerplate code required to set up and tear down a
client.

Change-Id: Ic5b0c335d6e73250f7e5a3b9ce2f999c5119c573
Reviewed-on: http://gerrit.cloudera.org:8080/6313
Reviewed-by: Dan Hecht <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/62894e32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/62894e32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/62894e32

Branch: refs/heads/master
Commit: 62894e323a87f8f48ece7235f2ffb0eac922fbf8
Parents: 87e95f8
Author: Tim Armstrong <[email protected]>
Authored: Tue Mar 7 18:12:15 2017 -0800
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Mar 16 02:10:02 2017 +0000

----------------------------------------------------------------------
 .../runtime/bufferpool/buffer-pool-internal.h   |  12 +-
 be/src/runtime/bufferpool/buffer-pool-test.cc   | 148 +++++++------------
 be/src/runtime/bufferpool/buffer-pool.cc        |  71 ++++++---
 be/src/runtime/bufferpool/buffer-pool.h         |  48 ++++--
 be/src/runtime/bufferpool/reservation-tracker.h |   2 +
 be/src/runtime/bufferpool/suballocator-test.cc  |  30 ++--
 be/src/runtime/bufferpool/suballocator.cc       |   2 +-
 7 files changed, 163 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/buffer-pool-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h 
b/be/src/runtime/bufferpool/buffer-pool-internal.h
index abc8930..04d76bb 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -76,6 +76,7 @@
 
 #include "runtime/bufferpool/buffer-pool-counters.h"
 #include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
 #include "util/condition-variable.h"
 
 namespace impala {
@@ -84,7 +85,8 @@ namespace impala {
 class BufferPool::Client {
  public:
   Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group, const string& 
name,
-      RuntimeProfile* profile);
+      ReservationTracker* parent_reservation, MemTracker* mem_tracker,
+      int64_t reservation_limit, RuntimeProfile* profile);
 
   ~Client() {
     DCHECK_EQ(0, num_pages_);
@@ -93,6 +95,9 @@ class BufferPool::Client {
     DCHECK_EQ(0, in_flight_write_pages_.size());
   }
 
+  /// Release reservation for this client.
+  void Close() { reservation_.Close(); }
+
   /// Add a new pinned page 'page' to the pinned pages list. 'page' must not 
be in any
   /// other lists. Neither the client's lock nor page->buffer_lock should be 
held by the
   /// caller.
@@ -144,6 +149,7 @@ class BufferPool::Client {
     DCHECK(client_lock.mutex() == &lock_ && client_lock.owns_lock());
   }
 
+  ReservationTracker* reservation() { return &reservation_; }
   const BufferPoolClientCounters& counters() const { return counters_; }
   bool spilling_enabled() const { return file_group_ != NULL; }
 
@@ -183,6 +189,10 @@ class BufferPool::Client {
   /// A name identifying the client.
   const std::string name_;
 
+  /// The reservation tracker for the client. All pages pinned by the client 
count as
+  /// usage against 'reservation_'.
+  ReservationTracker reservation_;
+
   /// The RuntimeProfile counters for this client, owned by the client's 
RuntimeProfile.
   /// All non-NULL.
   BufferPoolClientCounters counters_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc 
b/be/src/runtime/bufferpool/buffer-pool-test.cc
index b3dbfa9..0e0d384 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <cstdlib>
+#include <limits>
 #include <string>
 #include <vector>
 #include <boost/bind.hpp>
@@ -123,7 +124,6 @@ void BufferPoolTest::RegisterQueriesAndClients(BufferPool* 
pool, int query_id_hi
 
   int clients_per_query = 32;
   BufferPool::ClientHandle* clients[num_queries];
-  ReservationTracker* client_reservations[num_queries];
 
   for (int i = 0; i < num_queries; ++i) {
     int64_t query_id = QueryId(query_id_hi, i);
@@ -140,7 +140,6 @@ void BufferPoolTest::RegisterQueriesAndClients(BufferPool* 
pool, int query_id_hi
     
EXPECT_TRUE(query_reservation->IncreaseReservationToFit(initial_query_reservation));
 
     clients[i] = new BufferPool::ClientHandle[clients_per_query];
-    client_reservations[i] = new ReservationTracker[clients_per_query];
 
     for (int j = 0; j < clients_per_query; ++j) {
       int64_t initial_client_reservation =
@@ -148,13 +147,10 @@ void 
BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi
           < initial_query_reservation % clients_per_query;
       // Reservation limit can be anything greater or equal to the initial 
reservation.
       int64_t client_reservation_limit = initial_client_reservation + rand() % 
100000;
-      client_reservations[i][j].InitChildTracker(
-          NULL, query_reservation, NULL, client_reservation_limit);
-      EXPECT_TRUE(
-          
client_reservations[i][j].IncreaseReservationToFit(initial_client_reservation));
       string name = Substitute("Client $0 for query $1", j, query_id);
-      EXPECT_OK(pool->RegisterClient(
-          name, &client_reservations[i][j], NULL, NewProfile(), 
&clients[i][j]));
+      EXPECT_OK(pool->RegisterClient(name, NULL, query_reservation, NULL,
+          client_reservation_limit, NewProfile(), &clients[i][j]));
+      
EXPECT_TRUE(clients[i][j].IncreaseReservationToFit(initial_client_reservation));
     }
 
     for (int j = 0; j < clients_per_query; ++j) {
@@ -167,11 +163,9 @@ void BufferPoolTest::RegisterQueriesAndClients(BufferPool* 
pool, int query_id_hi
     for (int j = 0; j < clients_per_query; ++j) {
       pool->DeregisterClient(&clients[i][j]);
       ASSERT_FALSE(clients[i][j].is_registered());
-      client_reservations[i][j].Close();
     }
 
     delete[] clients[i];
-    delete[] client_reservations[i];
 
     GetQueryReservationTracker(QueryId(query_id_hi, i))->Close();
   }
@@ -234,12 +228,10 @@ TEST_F(BufferPoolTest, PageCreation) {
   int64_t total_mem = 2 * 2 * max_page_len;
   global_reservations_.InitRootTracker(NULL, total_mem);
   BufferPool pool(TEST_BUFFER_LEN, total_mem);
-  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
-  client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, 
total_mem);
-  ASSERT_TRUE(client_tracker->IncreaseReservation(total_mem));
   BufferPool::ClientHandle client;
-  ASSERT_OK(
-      pool.RegisterClient("test client", client_tracker, NULL, NewProfile(), 
&client));
+  ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, 
NULL,
+      total_mem, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(total_mem));
 
   vector<BufferPool::PageHandle> handles(num_pages);
 
@@ -247,7 +239,7 @@ TEST_F(BufferPoolTest, PageCreation) {
   for (int i = 0; i < num_pages; ++i) {
     int size_multiple = 1 << i;
     int64_t page_len = TEST_BUFFER_LEN * size_multiple;
-    int64_t used_before = client_tracker->GetUsedReservation();
+    int64_t used_before = client.GetUsedReservation();
     ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i]));
     ASSERT_TRUE(handles[i].is_open());
     ASSERT_TRUE(handles[i].is_pinned());
@@ -256,19 +248,18 @@ TEST_F(BufferPoolTest, PageCreation) {
     ASSERT_EQ(handles[i].buffer_handle()->data(), handles[i].data());
     ASSERT_EQ(handles[i].len(), page_len);
     ASSERT_EQ(handles[i].buffer_handle()->len(), page_len);
-    ASSERT_EQ(client_tracker->GetUsedReservation(), used_before + page_len);
+    ASSERT_EQ(client.GetUsedReservation(), used_before + page_len);
   }
 
   // Close the handles and check memory consumption.
   for (int i = 0; i < num_pages; ++i) {
-    int64_t used_before = client_tracker->GetUsedReservation();
+    int64_t used_before = client.GetUsedReservation();
     int page_len = handles[i].len();
     pool.DestroyPage(&client, &handles[i]);
-    ASSERT_EQ(client_tracker->GetUsedReservation(), used_before - page_len);
+    ASSERT_EQ(client.GetUsedReservation(), used_before - page_len);
   }
 
   pool.DeregisterClient(&client);
-  client_tracker->Close();
 
   // All the reservations should be released at this point.
   ASSERT_EQ(global_reservations_.GetReservation(), 0);
@@ -282,12 +273,10 @@ TEST_F(BufferPoolTest, BufferAllocation) {
   int64_t total_mem = 2 * 2 * max_buffer_len;
   global_reservations_.InitRootTracker(NULL, total_mem);
   BufferPool pool(TEST_BUFFER_LEN, total_mem);
-  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
-  client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, 
total_mem);
-  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(total_mem));
   BufferPool::ClientHandle client;
-  ASSERT_OK(
-      pool.RegisterClient("test client", client_tracker, NULL, NewProfile(), 
&client));
+  ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, 
NULL,
+      total_mem, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservationToFit(total_mem));
 
   vector<BufferPool::BufferHandle> handles(num_buffers);
 
@@ -295,24 +284,23 @@ TEST_F(BufferPoolTest, BufferAllocation) {
   for (int i = 0; i < num_buffers; ++i) {
     int size_multiple = 1 << i;
     int64_t buffer_len = TEST_BUFFER_LEN * size_multiple;
-    int64_t used_before = client_tracker->GetUsedReservation();
+    int64_t used_before = client.GetUsedReservation();
     ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i]));
     ASSERT_TRUE(handles[i].is_open());
     ASSERT_TRUE(handles[i].data() != NULL);
     ASSERT_EQ(handles[i].len(), buffer_len);
-    ASSERT_EQ(client_tracker->GetUsedReservation(), used_before + buffer_len);
+    ASSERT_EQ(client.GetUsedReservation(), used_before + buffer_len);
   }
 
   // Close the handles and check memory consumption.
   for (int i = 0; i < num_buffers; ++i) {
-    int64_t used_before = client_tracker->GetUsedReservation();
+    int64_t used_before = client.GetUsedReservation();
     int buffer_len = handles[i].len();
     pool.FreeBuffer(&client, &handles[i]);
-    ASSERT_EQ(client_tracker->GetUsedReservation(), used_before - buffer_len);
+    ASSERT_EQ(client.GetUsedReservation(), used_before - buffer_len);
   }
 
   pool.DeregisterClient(&client);
-  client_tracker->Close();
 
   // All the reservations should be released at this point.
   ASSERT_EQ(global_reservations_.GetReservation(), 0);
@@ -326,15 +314,12 @@ TEST_F(BufferPoolTest, BufferTransfer) {
   int64_t total_mem = num_clients * TEST_BUFFER_LEN;
   global_reservations_.InitRootTracker(NULL, total_mem);
   BufferPool pool(TEST_BUFFER_LEN, total_mem);
-  ReservationTracker client_trackers[num_clients];
   BufferPool::ClientHandle clients[num_clients];
   BufferPool::BufferHandle handles[num_clients];
   for (int i = 0; i < num_clients; ++i) {
-    client_trackers[i].InitChildTracker(
-        NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
-    ASSERT_TRUE(client_trackers[i].IncreaseReservationToFit(TEST_BUFFER_LEN));
-    ASSERT_OK(pool.RegisterClient(
-        "test client", &client_trackers[i], NULL, NewProfile(), &clients[i]));
+    ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, 
NULL,
+        TEST_BUFFER_LEN, NewProfile(), &clients[i]));
+    ASSERT_TRUE(clients[i].IncreaseReservationToFit(TEST_BUFFER_LEN));
   }
 
   // Transfer the page around between the clients repeatedly in a circle.
@@ -347,19 +332,16 @@ TEST_F(BufferPoolTest, BufferTransfer) {
           &clients[next_client], &handles[next_client]));
       // Check that the transfer left things in a consistent state.
       ASSERT_FALSE(handles[client].is_open());
-      ASSERT_EQ(0, client_trackers[client].GetUsedReservation());
+      ASSERT_EQ(0, clients[client].GetUsedReservation());
       ASSERT_TRUE(handles[next_client].is_open());
-      ASSERT_EQ(TEST_BUFFER_LEN, 
client_trackers[next_client].GetUsedReservation());
+      ASSERT_EQ(TEST_BUFFER_LEN, clients[next_client].GetUsedReservation());
       // The same underlying buffer should be used.
       ASSERT_EQ(data, handles[next_client].data());
     }
   }
 
   pool.FreeBuffer(&clients[0], &handles[0]);
-  for (int i = 0; i < num_clients; ++i) {
-    pool.DeregisterClient(&clients[i]);
-    client_trackers[i].Close();
-  }
+  for (BufferPool::ClientHandle& client : clients) 
pool.DeregisterClient(&client);
   ASSERT_EQ(global_reservations_.GetReservation(), 0);
   global_reservations_.Close();
 }
@@ -371,13 +353,10 @@ TEST_F(BufferPoolTest, Pin) {
   int64_t child_reservation = TEST_BUFFER_LEN * 2;
   BufferPool pool(TEST_BUFFER_LEN, total_mem);
   global_reservations_.InitRootTracker(NULL, total_mem);
-  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
-  client_tracker->InitChildTracker(
-      NewProfile(), &global_reservations_, NULL, child_reservation);
-  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation));
   BufferPool::ClientHandle client;
-  ASSERT_OK(pool.RegisterClient(
-      "test client", client_tracker, NewFileGroup(), NewProfile(), &client));
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), 
&global_reservations_,
+      NULL, child_reservation, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservationToFit(child_reservation));
 
   BufferPool::PageHandle handle1, handle2;
 
@@ -416,7 +395,6 @@ TEST_F(BufferPoolTest, Pin) {
   pool.DestroyPage(&client, &double_handle);
 
   pool.DeregisterClient(&client);
-  client_tracker->Close();
 }
 
 /// Creating a page or pinning without sufficient reservation should DCHECK.
@@ -424,18 +402,15 @@ TEST_F(BufferPoolTest, PinWithoutReservation) {
   int64_t total_mem = TEST_BUFFER_LEN * 1024;
   BufferPool pool(TEST_BUFFER_LEN, total_mem);
   global_reservations_.InitRootTracker(NULL, total_mem);
-  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
-  client_tracker->InitChildTracker(
-      NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
   BufferPool::ClientHandle client;
-  ASSERT_OK(
-      pool.RegisterClient("test client", client_tracker, NULL, NewProfile(), 
&client));
+  ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, 
NULL,
+      TEST_BUFFER_LEN, NewProfile(), &client));
 
   BufferPool::PageHandle handle;
   IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, 
&handle), "");
 
   // Should succeed after increasing reservation.
-  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(TEST_BUFFER_LEN));
+  ASSERT_TRUE(client.IncreaseReservationToFit(TEST_BUFFER_LEN));
   ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle));
 
   // But we can't pin again.
@@ -443,7 +418,6 @@ TEST_F(BufferPoolTest, PinWithoutReservation) {
 
   pool.DestroyPage(&client, &handle);
   pool.DeregisterClient(&client);
-  client_tracker->Close();
 }
 
 TEST_F(BufferPoolTest, ExtractBuffer) {
@@ -452,13 +426,10 @@ TEST_F(BufferPoolTest, ExtractBuffer) {
   int64_t child_reservation = TEST_BUFFER_LEN * 2;
   BufferPool pool(TEST_BUFFER_LEN, total_mem);
   global_reservations_.InitRootTracker(NULL, total_mem);
-  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
-  client_tracker->InitChildTracker(
-      NewProfile(), &global_reservations_, NULL, child_reservation);
-  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation));
   BufferPool::ClientHandle client;
-  ASSERT_OK(pool.RegisterClient(
-      "test client", client_tracker, NewFileGroup(), NewProfile(), &client));
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), 
&global_reservations_,
+      NULL, child_reservation, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservationToFit(child_reservation));
 
   BufferPool::PageHandle page;
   BufferPool::BufferHandle buffer;
@@ -472,24 +443,24 @@ TEST_F(BufferPoolTest, ExtractBuffer) {
     ASSERT_TRUE(buffer.is_open());
     ASSERT_EQ(len, buffer.len());
     ASSERT_EQ(page_data, buffer.data());
-    ASSERT_EQ(len, client_tracker->GetUsedReservation());
+    ASSERT_EQ(len, client.GetUsedReservation());
     pool.FreeBuffer(&client, &buffer);
-    ASSERT_EQ(0, client_tracker->GetUsedReservation());
+    ASSERT_EQ(0, client.GetUsedReservation());
   }
 
   // Test that ExtractBuffer() accounts correctly for pin count > 1.
   ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
   uint8_t* page_data = page.data();
   ASSERT_OK(pool.Pin(&client, &page));
-  ASSERT_EQ(TEST_BUFFER_LEN * 2, client_tracker->GetUsedReservation());
+  ASSERT_EQ(TEST_BUFFER_LEN * 2, client.GetUsedReservation());
   pool.ExtractBuffer(&client, &page, &buffer);
-  ASSERT_EQ(TEST_BUFFER_LEN, client_tracker->GetUsedReservation());
+  ASSERT_EQ(TEST_BUFFER_LEN, client.GetUsedReservation());
   ASSERT_FALSE(page.is_open());
   ASSERT_TRUE(buffer.is_open());
   ASSERT_EQ(TEST_BUFFER_LEN, buffer.len());
   ASSERT_EQ(page_data, buffer.data());
   pool.FreeBuffer(&client, &buffer);
-  ASSERT_EQ(0, client_tracker->GetUsedReservation());
+  ASSERT_EQ(0, client.GetUsedReservation());
 
   // Test that ExtractBuffer() DCHECKs for unpinned pages.
   ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
@@ -498,7 +469,6 @@ TEST_F(BufferPoolTest, ExtractBuffer) {
   pool.DestroyPage(&client, &page);
 
   pool.DeregisterClient(&client);
-  client_tracker->Close();
 }
 
 // Test concurrent creation and destruction of pages.
@@ -534,22 +504,18 @@ TEST_F(BufferPoolTest, ConcurrentPageCreation) {
 
 void BufferPoolTest::CreatePageLoop(BufferPool* pool, TmpFileMgr::FileGroup* 
file_group,
     ReservationTracker* parent_tracker, int num_ops) {
-  ReservationTracker client_tracker;
-  client_tracker.InitChildTracker(NewProfile(), parent_tracker, NULL, 
TEST_BUFFER_LEN);
   BufferPool::ClientHandle client;
-  ASSERT_OK(pool->RegisterClient(
-      "test client", &client_tracker, file_group, NewProfile(), &client));
+  ASSERT_OK(pool->RegisterClient("test client", file_group, parent_tracker, 
NULL,
+      TEST_BUFFER_LEN, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(TEST_BUFFER_LEN));
   for (int i = 0; i < num_ops; ++i) {
     BufferPool::PageHandle handle;
-    ASSERT_TRUE(client_tracker.IncreaseReservation(TEST_BUFFER_LEN));
     ASSERT_OK(pool->CreatePage(&client, TEST_BUFFER_LEN, &handle));
     pool->Unpin(&client, &handle);
     ASSERT_OK(pool->Pin(&client, &handle));
     pool->DestroyPage(&client, &handle);
-    client_tracker.DecreaseReservation(TEST_BUFFER_LEN);
   }
   pool->DeregisterClient(&client);
-  client_tracker.Close();
 }
 
 /// Test that DCHECK fires when trying to unpin a page with spilling disabled.
@@ -559,9 +525,9 @@ TEST_F(BufferPoolTest, SpillingDisabledDcheck) {
   BufferPool::PageHandle handle;
 
   BufferPool::ClientHandle client;
-  ASSERT_OK(pool.RegisterClient(
-      "test client", &global_reservations_, NULL, NewProfile(), &client));
-  ASSERT_TRUE(global_reservations_.IncreaseReservation(2 * TEST_BUFFER_LEN));
+  ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, 
NULL,
+      numeric_limits<int64_t>::max(), NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(2 * TEST_BUFFER_LEN));
   ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle));
 
   ASSERT_OK(pool.Pin(&client, &handle));
@@ -582,9 +548,9 @@ TEST_F(BufferPoolTest, EvictPageSameClient) {
   BufferPool::PageHandle handle1, handle2;
 
   BufferPool::ClientHandle client;
-  ASSERT_OK(pool.RegisterClient(
-      "test client", &global_reservations_, NewFileGroup(), NewProfile(), 
&client));
-  ASSERT_TRUE(global_reservations_.IncreaseReservation(TEST_BUFFER_LEN));
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), 
&global_reservations_,
+      NULL, TEST_BUFFER_LEN, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(TEST_BUFFER_LEN));
   ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
 
   // Do not have enough reservations because we pinned the page.
@@ -607,9 +573,9 @@ TEST_F(BufferPoolTest, EvictPageDifferentSizes) {
   BufferPool::PageHandle handle1, handle2;
 
   BufferPool::ClientHandle client;
-  ASSERT_OK(pool.RegisterClient(
-      "test client", &global_reservations_, NewFileGroup(), NewProfile(), 
&client));
-  ASSERT_TRUE(global_reservations_.IncreaseReservation(2 * TEST_BUFFER_LEN));
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), 
&global_reservations_,
+      NULL, TOTAL_BYTES, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(2 * TEST_BUFFER_LEN));
   ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
   pool.Unpin(&client, &handle1);
 
@@ -635,14 +601,11 @@ TEST_F(BufferPoolTest, EvictPageDifferentClient) {
   global_reservations_.InitRootTracker(NULL, TOTAL_BYTES);
   BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES);
 
-  ReservationTracker client_reservations[NUM_CLIENTS];
   BufferPool::ClientHandle clients[NUM_CLIENTS];
   for (int i = 0; i < NUM_CLIENTS; ++i) {
-    client_reservations[i].InitChildTracker(
-        NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
-    ASSERT_TRUE(client_reservations[i].IncreaseReservation(TEST_BUFFER_LEN));
-    ASSERT_OK(pool.RegisterClient(Substitute("test client $0", i),
-        &client_reservations[i], NewFileGroup(), NewProfile(), &clients[i]));
+    ASSERT_OK(pool.RegisterClient(Substitute("test client $0", i), 
NewFileGroup(),
+        &global_reservations_, NULL, TEST_BUFFER_LEN, NewProfile(), 
&clients[i]));
+    ASSERT_TRUE(clients[i].IncreaseReservation(TEST_BUFFER_LEN));
   }
 
   // Create a pinned and unpinned page for the first client.
@@ -668,10 +631,7 @@ TEST_F(BufferPoolTest, EvictPageDifferentClient) {
   pool.DestroyPage(&clients[0], &handle1);
   pool.DestroyPage(&clients[0], &handle2);
   pool.FreeBuffer(&clients[1], &buffer);
-  for (int i = 0; i < NUM_CLIENTS; ++i) {
-    pool.DeregisterClient(&clients[i]);
-    client_reservations[i].Close();
-  }
+  for (BufferPool::ClientHandle& client : clients) 
pool.DeregisterClient(&client);
 }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc 
b/be/src/runtime/bufferpool/buffer-pool.cc
index 7a026d6..842501f 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -24,7 +24,6 @@
 #include "common/names.h"
 #include "gutil/strings/substitute.h"
 #include "runtime/bufferpool/buffer-allocator.h"
-#include "runtime/bufferpool/reservation-tracker.h"
 #include "util/bit-util.h"
 #include "util/runtime-profile-counters.h"
 #include "util/uid-util.h"
@@ -129,19 +128,19 @@ BufferPool::~BufferPool() {
   DCHECK_EQ(0, clean_pages_.size());
 }
 
-Status BufferPool::RegisterClient(const string& name, ReservationTracker* 
reservation,
-    TmpFileMgr::FileGroup* file_group, RuntimeProfile* profile, ClientHandle* 
client) {
+Status BufferPool::RegisterClient(const string& name, TmpFileMgr::FileGroup* 
file_group,
+    ReservationTracker* parent_reservation, MemTracker* mem_tracker,
+    int64_t reservation_limit, RuntimeProfile* profile, ClientHandle* client) {
   DCHECK(!client->is_registered());
-  DCHECK(reservation != NULL);
-  client->reservation_ = reservation;
-  client->impl_ = new Client(this, file_group, name, profile);
+  DCHECK(parent_reservation != NULL);
+  client->impl_ = new Client(this, file_group, name, parent_reservation, 
mem_tracker,
+      reservation_limit, profile);
   return Status::OK();
 }
 
 void BufferPool::DeregisterClient(ClientHandle* client) {
   if (!client->is_registered()) return;
-  client->reservation_->Close(); // Will DCHECK if any remaining buffers or 
pinned pages.
-  client->reservation_ = NULL;
+  client->impl_->Close(); // Will DCHECK if any remaining buffers or pinned 
pages.
   delete client->impl_; // Will DCHECK if there are any remaining pages.
   client->impl_ = NULL;
 }
@@ -190,7 +189,7 @@ Status BufferPool::Pin(ClientHandle* client, PageHandle* 
handle) {
   }
   // Update accounting last to avoid complicating the error return path above.
   ++page->pin_count;
-  client->reservation_->AllocateFrom(page->len);
+  client->impl_->reservation()->AllocateFrom(page->len);
   return Status::OK();
 }
 
@@ -201,10 +200,11 @@ void BufferPool::Unpin(ClientHandle* client, PageHandle* 
handle) {
   // If handle is pinned, we can assume that the page itself is pinned.
   DCHECK(handle->is_pinned());
   Page* page = handle->page_;
-  client->reservation_->ReleaseTo(page->len);
+  ReservationTracker* reservation = client->impl_->reservation();
+  reservation->ReleaseTo(page->len);
 
   if (--page->pin_count > 0) return;
-  
client->impl_->MoveToDirtyUnpinned(client->reservation_->GetUnusedReservation(),
 page);
+  client->impl_->MoveToDirtyUnpinned(reservation->GetUnusedReservation(), 
page);
   COUNTER_ADD(client->impl_->counters().total_unpinned_bytes, handle->len());
   COUNTER_ADD(client->impl_->counters().peak_unpinned_bytes, handle->len());
 }
@@ -225,8 +225,9 @@ void BufferPool::ExtractBuffer(
 
 Status BufferPool::AllocateBuffer(
     ClientHandle* client, int64_t len, BufferHandle* handle) {
-  
RETURN_IF_ERROR(client->impl_->CleanPagesBeforeAllocation(client->reservation_, 
len));
-  client->reservation_->AllocateFrom(len);
+  ReservationTracker* reservation = client->impl_->reservation();
+  RETURN_IF_ERROR(client->impl_->CleanPagesBeforeAllocation(reservation, len));
+  reservation->AllocateFrom(len);
   return AllocateBufferInternal(client, len, handle);
 }
 
@@ -258,7 +259,7 @@ Status BufferPool::AllocateBufferInternal(
 void BufferPool::FreeBuffer(ClientHandle* client, BufferHandle* handle) {
   if (!handle->is_open()) return; // Should be idempotent.
   DCHECK_EQ(client, handle->client_);
-  client->reservation_->ReleaseTo(handle->len_);
+  client->impl_->reservation()->ReleaseTo(handle->len_);
   FreeBufferInternal(handle);
 }
 
@@ -277,8 +278,8 @@ Status BufferPool::TransferBuffer(ClientHandle* src_client, 
BufferHandle* src,
   DCHECK_NE(src, dst);
   DCHECK_NE(src_client, dst_client);
 
-  dst_client->reservation_->AllocateFrom(src->len());
-  src_client->reservation_->ReleaseTo(src->len());
+  dst_client->impl_->reservation()->AllocateFrom(src->len());
+  src_client->impl_->reservation()->ReleaseTo(src->len());
   *dst = std::move(*src);
   dst->client_ = dst_client;
   return Status::OK();
@@ -348,14 +349,37 @@ Status BufferPool::EvictCleanPages(int64_t 
bytes_to_evict) {
   return Status::OK();
 }
 
+bool BufferPool::ClientHandle::IncreaseReservation(int64_t bytes) {
+  return impl_->reservation()->IncreaseReservation(bytes);
+}
+
+bool BufferPool::ClientHandle::IncreaseReservationToFit(int64_t bytes) {
+  return impl_->reservation()->IncreaseReservationToFit(bytes);
+}
+
+int64_t BufferPool::ClientHandle::GetReservation() const {
+  return impl_->reservation()->GetReservation();
+}
+
+int64_t BufferPool::ClientHandle::GetUsedReservation() const {
+  return impl_->reservation()->GetUsedReservation();
+}
+
+int64_t BufferPool::ClientHandle::GetUnusedReservation() const {
+  return impl_->reservation()->GetUnusedReservation();
+}
+
 BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
-    const string& name, RuntimeProfile* profile)
+    const string& name, ReservationTracker* parent_reservation, MemTracker* 
mem_tracker,
+    int64_t reservation_limit, RuntimeProfile* profile)
   : pool_(pool),
     file_group_(file_group),
     name_(name),
     num_pages_(0),
     dirty_unpinned_bytes_(0),
     in_flight_write_bytes_(0) {
+  reservation_.InitChildTracker(
+      profile, parent_reservation, mem_tracker, reservation_limit);
   counters_.get_buffer_time = ADD_TIMER(profile, "BufferPoolGetBufferTime");
   counters_.read_wait_time = ADD_TIMER(profile, "BufferPoolReadIoWaitTime");
   counters_.read_io_ops = ADD_COUNTER(profile, "BufferPoolReadIoOps", 
TUnit::UNIT);
@@ -476,8 +500,8 @@ Status BufferPool::Client::MoveEvictedToPinned(
     unique_lock<mutex>* client_lock, ClientHandle* client, PageHandle* handle) 
{
   Page* page = handle->page_;
   DCHECK(!page->buffer.is_open());
-  RETURN_IF_ERROR(
-      CleanPagesBeforeAllocationLocked(client_lock, client->reservation_, 
page->len));
+  RETURN_IF_ERROR(CleanPagesBeforeAllocationLocked(
+      client_lock, client->impl_->reservation(), page->len));
 
   // Don't hold any locks while allocating or reading back the data. It is 
safe to modify
   // the page's buffer handle without holding any locks because no concurrent 
operations
@@ -620,9 +644,9 @@ string BufferPool::Client::DebugString() {
   lock_guard<mutex> lock(lock_);
   stringstream ss;
   ss << Substitute("<BufferPool::Client> $0 name: $1 write_status: $2 
num_pages: $3 "
-                   "dirty_unpinned_bytes: $4 in_flight_write_bytes: $5",
+                   "dirty_unpinned_bytes: $4 in_flight_write_bytes: $5 
reservation: {$6}",
       this, name_, write_status_.GetDetail(), num_pages_, 
dirty_unpinned_bytes_,
-      in_flight_write_bytes_);
+      in_flight_write_bytes_, reservation_.DebugString());
   ss << "\n  " << pinned_pages_.size() << " pinned pages: ";
   pinned_pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
   ss << "\n  " << dirty_unpinned_pages_.size() << " dirty unpinned pages: ";
@@ -634,9 +658,8 @@ string BufferPool::Client::DebugString() {
 
 string BufferPool::ClientHandle::DebugString() const {
   if (is_registered()) {
-    return Substitute("<BufferPool::Client> $0 reservation: {$1} "
-                      "internal state: {$2}",
-        this, reservation_->DebugString(), impl_->DebugString());
+    return Substitute(
+        "<BufferPool::Client> $0 internal state: {$1}", this, 
impl_->DebugString());
   } else {
     return Substitute("<BufferPool::ClientHandle> $0 UNREGISTERED", this);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h 
b/be/src/runtime/bufferpool/buffer-pool.h
index 4e62b3b..e27e645 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -161,17 +161,23 @@ class BufferPool : public CacheLineAligned {
 
   /// Register a client. Returns an error status and does not register the 
client if the
   /// arguments are invalid. 'name' is an arbitrary name used to identify the 
client in
-  /// any errors messages or logging. Counters for this client are added to 
the (non-NULL)
+  /// any errors messages or logging. If 'file_group' is non-NULL, it is used 
to allocate
+  /// scratch space to write unpinned pages to disk. If it is NULL, unpinning 
of pages is
+  /// not allowed for this client. Counters for this client are added to the 
(non-NULL)
   /// 'profile'. 'client' is the client to register. 'client' should not 
already be
-  /// registered. If 'file_group' is non-NULL, it is used to allocate scratch 
space to
-  /// write unpinned pages to disk. If it is NULL, unpinning of pages is not 
allowed for
-  /// this client.
-  Status RegisterClient(const std::string& name, ReservationTracker* 
reservation,
-      TmpFileMgr::FileGroup* file_group, RuntimeProfile* profile,
+  /// registered.
+  ///
+  /// The client's reservation is created as a child of 'parent_reservation' 
with limit
+  /// 'reservation_limit' and associated with MemTracker 'mem_tracker'. The 
initial
+  /// reservation is 0 bytes.
+  Status RegisterClient(const std::string& name, TmpFileMgr::FileGroup* 
file_group,
+      ReservationTracker* parent_reservation, MemTracker* mem_tracker,
+      int64_t reservation_limit, RuntimeProfile* profile,
       ClientHandle* client) WARN_UNUSED_RESULT;
 
   /// Deregister 'client' if it is registered. All pages must be destroyed and 
buffers
-  /// must be freed for the client before calling this. Idempotent.
+  /// must be freed for the client before calling this. Releases any 
reservation that
+  /// belongs to the client. Idempotent.
   void DeregisterClient(ClientHandle* client);
 
   /// Create a new page of 'len' bytes with pin count 1. 'len' must be a page 
length
@@ -316,12 +322,27 @@ class BufferPool : public CacheLineAligned {
 /// Client methods or BufferPool methods with the Client as an argument is not 
supported.
 class BufferPool::ClientHandle {
  public:
-  ClientHandle() : reservation_(NULL) {}
+  ClientHandle() : impl_(NULL) {}
   /// Client must be deregistered.
   ~ClientHandle() { DCHECK(!is_registered()); }
 
-  bool is_registered() const { return reservation_ != NULL; }
-  ReservationTracker* reservation() { return reservation_; }
+  /// Request to increase reservation for this client by 'bytes' by calling
+  /// ReservationTracker::IncreaseReservation(). Returns true if the 
reservation was
+  /// successfully increased.
+  bool IncreaseReservation(int64_t bytes) WARN_UNUSED_RESULT;
+
+  /// Tries to ensure that 'bytes' of unused reservation is available for this 
client
+  /// to use by calling ReservationTracker::IncreaseReservationToFit(). 
Returns true
+  /// if successful, after which 'bytes' can be used.
+  bool IncreaseReservationToFit(int64_t bytes) WARN_UNUSED_RESULT;
+
+  /// Accessors for this client's reservation corresponding to the 
identically-named
+  /// methods in ReservationTracker.
+  int64_t GetReservation() const;
+  int64_t GetUsedReservation() const;
+  int64_t GetUnusedReservation() const;
+
+  bool is_registered() const { return impl_ != NULL; }
 
   std::string DebugString() const;
 
@@ -329,11 +350,8 @@ class BufferPool::ClientHandle {
   friend class BufferPool;
   DISALLOW_COPY_AND_ASSIGN(ClientHandle);
 
-  /// The reservation tracker for the client. NULL means the client isn't 
registered.
-  /// All pages pinned by the client count as usage against 'reservation_'.
-  ReservationTracker* reservation_;
-
-  /// Internal state for the client. Owned by BufferPool.
+  /// Internal state for the client. NULL means the client isn't registered.
+  /// Owned by BufferPool.
   Client* impl_;
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.h 
b/be/src/runtime/bufferpool/reservation-tracker.h
index 9edd37f..3d40fcc 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.h
+++ b/be/src/runtime/bufferpool/reservation-tracker.h
@@ -108,6 +108,8 @@ class ReservationTracker {
   /// If the tracker is initialized, deregister the ReservationTracker from 
its parent,
   /// relinquishing all this tracker's reservation. All of the reservation 
must be unused
   /// and all the tracker's children must be closed before calling this method.
+  /// TODO: decide on and implement policy for how far to release the 
reservation up
+  /// the tree. Currently the reservation is released all the way to the root.
   void Close();
 
   /// Request to increase reservation by 'bytes'. The request is either 
granted in

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/suballocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator-test.cc 
b/be/src/runtime/bufferpool/suballocator-test.cc
index 01f0ea6..13bc597 100644
--- a/be/src/runtime/bufferpool/suballocator-test.cc
+++ b/be/src/runtime/bufferpool/suballocator-test.cc
@@ -17,6 +17,7 @@
 
 #include <algorithm>
 #include <cstdlib>
+#include <limits>
 #include <random>
 #include <string>
 #include <vector>
@@ -85,11 +86,11 @@ class SuballocatorTest : public ::testing::Test {
   /// Register a client with 'buffer_pool_'. The client is automatically 
deregistered
   /// and freed at the end of the test.
   void RegisterClient(
-      ReservationTracker* reservation, BufferPool::ClientHandle** client) {
+      ReservationTracker* parent_reservation, BufferPool::ClientHandle** 
client) {
     clients_.push_back(make_unique<BufferPool::ClientHandle>());
     *client = clients_.back().get();
-    ASSERT_OK(buffer_pool_->RegisterClient(
-        "test client", reservation, NULL, profile(), *client));
+    ASSERT_OK(buffer_pool_->RegisterClient("test client", NULL, 
parent_reservation, NULL,
+        numeric_limits<int64_t>::max(), profile(), *client));
   }
 
   /// Assert that the memory for all of the suballocations is writable and 
disjoint by
@@ -104,8 +105,8 @@ class SuballocatorTest : public ::testing::Test {
     allocs->clear();
   }
 
-  static void ExpectReservationUnused(ReservationTracker& reservation) {
-    EXPECT_EQ(reservation.GetUsedReservation(), 0) << 
reservation.DebugString();
+  static void ExpectReservationUnused(BufferPool::ClientHandle* client) {
+    EXPECT_EQ(client->GetUsedReservation(), 0) << client->DebugString();
   }
 
   RuntimeProfile* profile() { return profile_.get(); }
@@ -165,10 +166,10 @@ TEST_F(SuballocatorTest, SameSizeAllocations) {
   AssertMemoryValid(allocs);
 
   // Check that reservation usage matches the amount allocated.
-  EXPECT_EQ(global_reservation_.GetUsedReservation(), allocated_mem)
+  EXPECT_EQ(client->GetUsedReservation(), allocated_mem)
       << global_reservation_.DebugString();
   FreeAllocations(&allocator, &allocs);
-  ExpectReservationUnused(global_reservation_);
+  ExpectReservationUnused(client);
 }
 
 /// Check behaviour of zero-length allocation.
@@ -185,7 +186,7 @@ TEST_F(SuballocatorTest, ZeroLengthAllocation) {
   ASSERT_TRUE(alloc != nullptr) << global_reservation_.DebugString();
   EXPECT_EQ(alloc->len(), Suballocator::MIN_ALLOCATION_BYTES);
   allocator.Free(move(alloc));
-  ExpectReservationUnused(global_reservation_);
+  ExpectReservationUnused(client);
 }
 
 /// Check behaviour of out-of-range allocation.
@@ -203,7 +204,7 @@ TEST_F(SuballocatorTest, OutOfRangeAllocations) {
   // Too-large allocations fail gracefully.
   ASSERT_FALSE(allocator.Allocate(Suballocator::MAX_ALLOCATION_BYTES + 1, 
&alloc).ok())
       << global_reservation_.DebugString();
-  ExpectReservationUnused(global_reservation_);
+  ExpectReservationUnused(client);
 }
 
 /// Basic test to make sure that non-power-of-two suballocations are handled 
as expected
@@ -235,14 +236,13 @@ TEST_F(SuballocatorTest, NonPowerOfTwoAllocations) {
     // Check that it was rounded up to a power-of-two.
     EXPECT_EQ(alloc->len(), max(Suballocator::MIN_ALLOCATION_BYTES,
                                 BitUtil::RoundUpToPowerOfTwo(alloc_size)));
-    EXPECT_EQ(
-        max(TEST_BUFFER_LEN, alloc->len()), 
global_reservation_.GetUsedReservation())
+    EXPECT_EQ(max(TEST_BUFFER_LEN, alloc->len()), client->GetUsedReservation())
         << global_reservation_.DebugString();
     memset(alloc->data(), 0, alloc->len()); // Check memory is writable.
 
     allocator.Free(move(alloc));
   }
-  ExpectReservationUnused(global_reservation_);
+  ExpectReservationUnused(client);
 }
 
 /// Test that simulates hash table's patterns of doubling suballocations and 
validates
@@ -292,12 +292,12 @@ TEST_F(SuballocatorTest, DoublingAllocations) {
     // coalesced two buddies of curr_alloc_size / 2) and one buffer with only
     // 'curr_alloc_size' bytes in use (if an Allocate() call couldn't recycle 
memory and
     // had to allocate a new buffer).
-    EXPECT_LE(global_reservation_.GetUsedReservation(),
+    EXPECT_LE(client->GetUsedReservation(),
         TEST_BUFFER_LEN + max(TEST_BUFFER_LEN, curr_alloc_size * NUM_ALLOCS));
   }
   // Check that reservation usage behaves as expected.
   FreeAllocations(&allocator, &allocs);
-  ExpectReservationUnused(global_reservation_);
+  ExpectReservationUnused(client);
 }
 
 /// Do some randomised testing of the allocator. Simulate some interesting 
patterns with
@@ -352,7 +352,7 @@ TEST_F(SuballocatorTest, RandomAllocations) {
   }
   // Check that memory is released when suballocations are freed.
   FreeAllocations(&allocator, &allocs);
-  ExpectReservationUnused(global_reservation_);
+  ExpectReservationUnused(client);
 }
 
 void SuballocatorTest::AssertMemoryValid(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/suballocator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator.cc 
b/be/src/runtime/bufferpool/suballocator.cc
index a4835a4..a6ab1ff 100644
--- a/be/src/runtime/bufferpool/suballocator.cc
+++ b/be/src/runtime/bufferpool/suballocator.cc
@@ -89,7 +89,7 @@ int Suballocator::ComputeListIndex(int64_t bytes) const {
 Status Suballocator::AllocateBuffer(int64_t bytes, unique_ptr<Suballocation>* 
result) {
   DCHECK_LE(bytes, MAX_ALLOCATION_BYTES);
   const int64_t buffer_len = max(min_buffer_len_, 
BitUtil::RoundUpToPowerOfTwo(bytes));
-  if (!client_->reservation()->IncreaseReservationToFit(buffer_len)) {
+  if (!client_->IncreaseReservationToFit(buffer_len)) {
     *result = nullptr;
     return Status::OK();
   }


Reply via email to