IMPALA-4831: enforce BufferPool reservation invariants Before this patch ill-behaved code outside BufferPool could violate BufferPool invariants by calling methods on ReservationTracker() such as DecreaseReservation() or ReleaseFrom() or by hooking up Clients and ReservationTrackers in the wrong way (e.g. sharing a ReservationTracker between two Clients).
Now each client creates and owns its ReservationTracker and restricts which methods can be called from outside BufferPool. This also reduces the amount of boilerplate code required to set up and tear down a client. Change-Id: Ic5b0c335d6e73250f7e5a3b9ce2f999c5119c573 Reviewed-on: http://gerrit.cloudera.org:8080/6313 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/62894e32 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/62894e32 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/62894e32 Branch: refs/heads/master Commit: 62894e323a87f8f48ece7235f2ffb0eac922fbf8 Parents: 87e95f8 Author: Tim Armstrong <[email protected]> Authored: Tue Mar 7 18:12:15 2017 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Mar 16 02:10:02 2017 +0000 ---------------------------------------------------------------------- .../runtime/bufferpool/buffer-pool-internal.h | 12 +- be/src/runtime/bufferpool/buffer-pool-test.cc | 148 +++++++------------ be/src/runtime/bufferpool/buffer-pool.cc | 71 ++++++--- be/src/runtime/bufferpool/buffer-pool.h | 48 ++++-- be/src/runtime/bufferpool/reservation-tracker.h | 2 + be/src/runtime/bufferpool/suballocator-test.cc | 30 ++-- be/src/runtime/bufferpool/suballocator.cc | 2 +- 7 files changed, 163 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/buffer-pool-internal.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h index abc8930..04d76bb 100644 --- a/be/src/runtime/bufferpool/buffer-pool-internal.h +++ b/be/src/runtime/bufferpool/buffer-pool-internal.h @@ -76,6 +76,7 @@ #include "runtime/bufferpool/buffer-pool-counters.h" #include "runtime/bufferpool/buffer-pool.h" +#include "runtime/bufferpool/reservation-tracker.h" #include "util/condition-variable.h" namespace impala { @@ -84,7 +85,8 @@ namespace impala { class BufferPool::Client { public: Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group, const string& name, - RuntimeProfile* profile); + ReservationTracker* parent_reservation, MemTracker* mem_tracker, + int64_t reservation_limit, RuntimeProfile* profile); ~Client() { DCHECK_EQ(0, num_pages_); @@ -93,6 +95,9 @@ class BufferPool::Client { DCHECK_EQ(0, in_flight_write_pages_.size()); } + /// Release reservation for this client. + void Close() { reservation_.Close(); } + /// Add a new pinned page 'page' to the pinned pages list. 'page' must not be in any /// other lists. Neither the client's lock nor page->buffer_lock should be held by the /// caller. @@ -144,6 +149,7 @@ class BufferPool::Client { DCHECK(client_lock.mutex() == &lock_ && client_lock.owns_lock()); } + ReservationTracker* reservation() { return &reservation_; } const BufferPoolClientCounters& counters() const { return counters_; } bool spilling_enabled() const { return file_group_ != NULL; } @@ -183,6 +189,10 @@ class BufferPool::Client { /// A name identifying the client. const std::string name_; + /// The reservation tracker for the client. All pages pinned by the client count as + /// usage against 'reservation_'. + ReservationTracker reservation_; + /// The RuntimeProfile counters for this client, owned by the client's RuntimeProfile. /// All non-NULL. BufferPoolClientCounters counters_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/buffer-pool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc index b3dbfa9..0e0d384 100644 --- a/be/src/runtime/bufferpool/buffer-pool-test.cc +++ b/be/src/runtime/bufferpool/buffer-pool-test.cc @@ -16,6 +16,7 @@ // under the License. #include <cstdlib> +#include <limits> #include <string> #include <vector> #include <boost/bind.hpp> @@ -123,7 +124,6 @@ void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi int clients_per_query = 32; BufferPool::ClientHandle* clients[num_queries]; - ReservationTracker* client_reservations[num_queries]; for (int i = 0; i < num_queries; ++i) { int64_t query_id = QueryId(query_id_hi, i); @@ -140,7 +140,6 @@ void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi EXPECT_TRUE(query_reservation->IncreaseReservationToFit(initial_query_reservation)); clients[i] = new BufferPool::ClientHandle[clients_per_query]; - client_reservations[i] = new ReservationTracker[clients_per_query]; for (int j = 0; j < clients_per_query; ++j) { int64_t initial_client_reservation = @@ -148,13 +147,10 @@ void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi < initial_query_reservation % clients_per_query; // Reservation limit can be anything greater or equal to the initial reservation. int64_t client_reservation_limit = initial_client_reservation + rand() % 100000; - client_reservations[i][j].InitChildTracker( - NULL, query_reservation, NULL, client_reservation_limit); - EXPECT_TRUE( - client_reservations[i][j].IncreaseReservationToFit(initial_client_reservation)); string name = Substitute("Client $0 for query $1", j, query_id); - EXPECT_OK(pool->RegisterClient( - name, &client_reservations[i][j], NULL, NewProfile(), &clients[i][j])); + EXPECT_OK(pool->RegisterClient(name, NULL, query_reservation, NULL, + client_reservation_limit, NewProfile(), &clients[i][j])); + EXPECT_TRUE(clients[i][j].IncreaseReservationToFit(initial_client_reservation)); } for (int j = 0; j < clients_per_query; ++j) { @@ -167,11 +163,9 @@ void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi for (int j = 0; j < clients_per_query; ++j) { pool->DeregisterClient(&clients[i][j]); ASSERT_FALSE(clients[i][j].is_registered()); - client_reservations[i][j].Close(); } delete[] clients[i]; - delete[] client_reservations[i]; GetQueryReservationTracker(QueryId(query_id_hi, i))->Close(); } @@ -234,12 +228,10 @@ TEST_F(BufferPoolTest, PageCreation) { int64_t total_mem = 2 * 2 * max_page_len; global_reservations_.InitRootTracker(NULL, total_mem); BufferPool pool(TEST_BUFFER_LEN, total_mem); - ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); - client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem); - ASSERT_TRUE(client_tracker->IncreaseReservation(total_mem)); BufferPool::ClientHandle client; - ASSERT_OK( - pool.RegisterClient("test client", client_tracker, NULL, NewProfile(), &client)); + ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL, + total_mem, NewProfile(), &client)); + ASSERT_TRUE(client.IncreaseReservation(total_mem)); vector<BufferPool::PageHandle> handles(num_pages); @@ -247,7 +239,7 @@ TEST_F(BufferPoolTest, PageCreation) { for (int i = 0; i < num_pages; ++i) { int size_multiple = 1 << i; int64_t page_len = TEST_BUFFER_LEN * size_multiple; - int64_t used_before = client_tracker->GetUsedReservation(); + int64_t used_before = client.GetUsedReservation(); ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i])); ASSERT_TRUE(handles[i].is_open()); ASSERT_TRUE(handles[i].is_pinned()); @@ -256,19 +248,18 @@ TEST_F(BufferPoolTest, PageCreation) { ASSERT_EQ(handles[i].buffer_handle()->data(), handles[i].data()); ASSERT_EQ(handles[i].len(), page_len); ASSERT_EQ(handles[i].buffer_handle()->len(), page_len); - ASSERT_EQ(client_tracker->GetUsedReservation(), used_before + page_len); + ASSERT_EQ(client.GetUsedReservation(), used_before + page_len); } // Close the handles and check memory consumption. for (int i = 0; i < num_pages; ++i) { - int64_t used_before = client_tracker->GetUsedReservation(); + int64_t used_before = client.GetUsedReservation(); int page_len = handles[i].len(); pool.DestroyPage(&client, &handles[i]); - ASSERT_EQ(client_tracker->GetUsedReservation(), used_before - page_len); + ASSERT_EQ(client.GetUsedReservation(), used_before - page_len); } pool.DeregisterClient(&client); - client_tracker->Close(); // All the reservations should be released at this point. ASSERT_EQ(global_reservations_.GetReservation(), 0); @@ -282,12 +273,10 @@ TEST_F(BufferPoolTest, BufferAllocation) { int64_t total_mem = 2 * 2 * max_buffer_len; global_reservations_.InitRootTracker(NULL, total_mem); BufferPool pool(TEST_BUFFER_LEN, total_mem); - ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); - client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem); - ASSERT_TRUE(client_tracker->IncreaseReservationToFit(total_mem)); BufferPool::ClientHandle client; - ASSERT_OK( - pool.RegisterClient("test client", client_tracker, NULL, NewProfile(), &client)); + ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL, + total_mem, NewProfile(), &client)); + ASSERT_TRUE(client.IncreaseReservationToFit(total_mem)); vector<BufferPool::BufferHandle> handles(num_buffers); @@ -295,24 +284,23 @@ TEST_F(BufferPoolTest, BufferAllocation) { for (int i = 0; i < num_buffers; ++i) { int size_multiple = 1 << i; int64_t buffer_len = TEST_BUFFER_LEN * size_multiple; - int64_t used_before = client_tracker->GetUsedReservation(); + int64_t used_before = client.GetUsedReservation(); ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i])); ASSERT_TRUE(handles[i].is_open()); ASSERT_TRUE(handles[i].data() != NULL); ASSERT_EQ(handles[i].len(), buffer_len); - ASSERT_EQ(client_tracker->GetUsedReservation(), used_before + buffer_len); + ASSERT_EQ(client.GetUsedReservation(), used_before + buffer_len); } // Close the handles and check memory consumption. for (int i = 0; i < num_buffers; ++i) { - int64_t used_before = client_tracker->GetUsedReservation(); + int64_t used_before = client.GetUsedReservation(); int buffer_len = handles[i].len(); pool.FreeBuffer(&client, &handles[i]); - ASSERT_EQ(client_tracker->GetUsedReservation(), used_before - buffer_len); + ASSERT_EQ(client.GetUsedReservation(), used_before - buffer_len); } pool.DeregisterClient(&client); - client_tracker->Close(); // All the reservations should be released at this point. ASSERT_EQ(global_reservations_.GetReservation(), 0); @@ -326,15 +314,12 @@ TEST_F(BufferPoolTest, BufferTransfer) { int64_t total_mem = num_clients * TEST_BUFFER_LEN; global_reservations_.InitRootTracker(NULL, total_mem); BufferPool pool(TEST_BUFFER_LEN, total_mem); - ReservationTracker client_trackers[num_clients]; BufferPool::ClientHandle clients[num_clients]; BufferPool::BufferHandle handles[num_clients]; for (int i = 0; i < num_clients; ++i) { - client_trackers[i].InitChildTracker( - NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN); - ASSERT_TRUE(client_trackers[i].IncreaseReservationToFit(TEST_BUFFER_LEN)); - ASSERT_OK(pool.RegisterClient( - "test client", &client_trackers[i], NULL, NewProfile(), &clients[i])); + ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL, + TEST_BUFFER_LEN, NewProfile(), &clients[i])); + ASSERT_TRUE(clients[i].IncreaseReservationToFit(TEST_BUFFER_LEN)); } // Transfer the page around between the clients repeatedly in a circle. @@ -347,19 +332,16 @@ TEST_F(BufferPoolTest, BufferTransfer) { &clients[next_client], &handles[next_client])); // Check that the transfer left things in a consistent state. ASSERT_FALSE(handles[client].is_open()); - ASSERT_EQ(0, client_trackers[client].GetUsedReservation()); + ASSERT_EQ(0, clients[client].GetUsedReservation()); ASSERT_TRUE(handles[next_client].is_open()); - ASSERT_EQ(TEST_BUFFER_LEN, client_trackers[next_client].GetUsedReservation()); + ASSERT_EQ(TEST_BUFFER_LEN, clients[next_client].GetUsedReservation()); // The same underlying buffer should be used. ASSERT_EQ(data, handles[next_client].data()); } } pool.FreeBuffer(&clients[0], &handles[0]); - for (int i = 0; i < num_clients; ++i) { - pool.DeregisterClient(&clients[i]); - client_trackers[i].Close(); - } + for (BufferPool::ClientHandle& client : clients) pool.DeregisterClient(&client); ASSERT_EQ(global_reservations_.GetReservation(), 0); global_reservations_.Close(); } @@ -371,13 +353,10 @@ TEST_F(BufferPoolTest, Pin) { int64_t child_reservation = TEST_BUFFER_LEN * 2; BufferPool pool(TEST_BUFFER_LEN, total_mem); global_reservations_.InitRootTracker(NULL, total_mem); - ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); - client_tracker->InitChildTracker( - NewProfile(), &global_reservations_, NULL, child_reservation); - ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation)); BufferPool::ClientHandle client; - ASSERT_OK(pool.RegisterClient( - "test client", client_tracker, NewFileGroup(), NewProfile(), &client)); + ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, + NULL, child_reservation, NewProfile(), &client)); + ASSERT_TRUE(client.IncreaseReservationToFit(child_reservation)); BufferPool::PageHandle handle1, handle2; @@ -416,7 +395,6 @@ TEST_F(BufferPoolTest, Pin) { pool.DestroyPage(&client, &double_handle); pool.DeregisterClient(&client); - client_tracker->Close(); } /// Creating a page or pinning without sufficient reservation should DCHECK. @@ -424,18 +402,15 @@ TEST_F(BufferPoolTest, PinWithoutReservation) { int64_t total_mem = TEST_BUFFER_LEN * 1024; BufferPool pool(TEST_BUFFER_LEN, total_mem); global_reservations_.InitRootTracker(NULL, total_mem); - ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); - client_tracker->InitChildTracker( - NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN); BufferPool::ClientHandle client; - ASSERT_OK( - pool.RegisterClient("test client", client_tracker, NULL, NewProfile(), &client)); + ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL, + TEST_BUFFER_LEN, NewProfile(), &client)); BufferPool::PageHandle handle; IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle), ""); // Should succeed after increasing reservation. - ASSERT_TRUE(client_tracker->IncreaseReservationToFit(TEST_BUFFER_LEN)); + ASSERT_TRUE(client.IncreaseReservationToFit(TEST_BUFFER_LEN)); ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle)); // But we can't pin again. @@ -443,7 +418,6 @@ TEST_F(BufferPoolTest, PinWithoutReservation) { pool.DestroyPage(&client, &handle); pool.DeregisterClient(&client); - client_tracker->Close(); } TEST_F(BufferPoolTest, ExtractBuffer) { @@ -452,13 +426,10 @@ TEST_F(BufferPoolTest, ExtractBuffer) { int64_t child_reservation = TEST_BUFFER_LEN * 2; BufferPool pool(TEST_BUFFER_LEN, total_mem); global_reservations_.InitRootTracker(NULL, total_mem); - ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); - client_tracker->InitChildTracker( - NewProfile(), &global_reservations_, NULL, child_reservation); - ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation)); BufferPool::ClientHandle client; - ASSERT_OK(pool.RegisterClient( - "test client", client_tracker, NewFileGroup(), NewProfile(), &client)); + ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, + NULL, child_reservation, NewProfile(), &client)); + ASSERT_TRUE(client.IncreaseReservationToFit(child_reservation)); BufferPool::PageHandle page; BufferPool::BufferHandle buffer; @@ -472,24 +443,24 @@ TEST_F(BufferPoolTest, ExtractBuffer) { ASSERT_TRUE(buffer.is_open()); ASSERT_EQ(len, buffer.len()); ASSERT_EQ(page_data, buffer.data()); - ASSERT_EQ(len, client_tracker->GetUsedReservation()); + ASSERT_EQ(len, client.GetUsedReservation()); pool.FreeBuffer(&client, &buffer); - ASSERT_EQ(0, client_tracker->GetUsedReservation()); + ASSERT_EQ(0, client.GetUsedReservation()); } // Test that ExtractBuffer() accounts correctly for pin count > 1. ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page)); uint8_t* page_data = page.data(); ASSERT_OK(pool.Pin(&client, &page)); - ASSERT_EQ(TEST_BUFFER_LEN * 2, client_tracker->GetUsedReservation()); + ASSERT_EQ(TEST_BUFFER_LEN * 2, client.GetUsedReservation()); pool.ExtractBuffer(&client, &page, &buffer); - ASSERT_EQ(TEST_BUFFER_LEN, client_tracker->GetUsedReservation()); + ASSERT_EQ(TEST_BUFFER_LEN, client.GetUsedReservation()); ASSERT_FALSE(page.is_open()); ASSERT_TRUE(buffer.is_open()); ASSERT_EQ(TEST_BUFFER_LEN, buffer.len()); ASSERT_EQ(page_data, buffer.data()); pool.FreeBuffer(&client, &buffer); - ASSERT_EQ(0, client_tracker->GetUsedReservation()); + ASSERT_EQ(0, client.GetUsedReservation()); // Test that ExtractBuffer() DCHECKs for unpinned pages. ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page)); @@ -498,7 +469,6 @@ TEST_F(BufferPoolTest, ExtractBuffer) { pool.DestroyPage(&client, &page); pool.DeregisterClient(&client); - client_tracker->Close(); } // Test concurrent creation and destruction of pages. @@ -534,22 +504,18 @@ TEST_F(BufferPoolTest, ConcurrentPageCreation) { void BufferPoolTest::CreatePageLoop(BufferPool* pool, TmpFileMgr::FileGroup* file_group, ReservationTracker* parent_tracker, int num_ops) { - ReservationTracker client_tracker; - client_tracker.InitChildTracker(NewProfile(), parent_tracker, NULL, TEST_BUFFER_LEN); BufferPool::ClientHandle client; - ASSERT_OK(pool->RegisterClient( - "test client", &client_tracker, file_group, NewProfile(), &client)); + ASSERT_OK(pool->RegisterClient("test client", file_group, parent_tracker, NULL, + TEST_BUFFER_LEN, NewProfile(), &client)); + ASSERT_TRUE(client.IncreaseReservation(TEST_BUFFER_LEN)); for (int i = 0; i < num_ops; ++i) { BufferPool::PageHandle handle; - ASSERT_TRUE(client_tracker.IncreaseReservation(TEST_BUFFER_LEN)); ASSERT_OK(pool->CreatePage(&client, TEST_BUFFER_LEN, &handle)); pool->Unpin(&client, &handle); ASSERT_OK(pool->Pin(&client, &handle)); pool->DestroyPage(&client, &handle); - client_tracker.DecreaseReservation(TEST_BUFFER_LEN); } pool->DeregisterClient(&client); - client_tracker.Close(); } /// Test that DCHECK fires when trying to unpin a page with spilling disabled. @@ -559,9 +525,9 @@ TEST_F(BufferPoolTest, SpillingDisabledDcheck) { BufferPool::PageHandle handle; BufferPool::ClientHandle client; - ASSERT_OK(pool.RegisterClient( - "test client", &global_reservations_, NULL, NewProfile(), &client)); - ASSERT_TRUE(global_reservations_.IncreaseReservation(2 * TEST_BUFFER_LEN)); + ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL, + numeric_limits<int64_t>::max(), NewProfile(), &client)); + ASSERT_TRUE(client.IncreaseReservation(2 * TEST_BUFFER_LEN)); ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle)); ASSERT_OK(pool.Pin(&client, &handle)); @@ -582,9 +548,9 @@ TEST_F(BufferPoolTest, EvictPageSameClient) { BufferPool::PageHandle handle1, handle2; BufferPool::ClientHandle client; - ASSERT_OK(pool.RegisterClient( - "test client", &global_reservations_, NewFileGroup(), NewProfile(), &client)); - ASSERT_TRUE(global_reservations_.IncreaseReservation(TEST_BUFFER_LEN)); + ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, + NULL, TEST_BUFFER_LEN, NewProfile(), &client)); + ASSERT_TRUE(client.IncreaseReservation(TEST_BUFFER_LEN)); ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1)); // Do not have enough reservations because we pinned the page. @@ -607,9 +573,9 @@ TEST_F(BufferPoolTest, EvictPageDifferentSizes) { BufferPool::PageHandle handle1, handle2; BufferPool::ClientHandle client; - ASSERT_OK(pool.RegisterClient( - "test client", &global_reservations_, NewFileGroup(), NewProfile(), &client)); - ASSERT_TRUE(global_reservations_.IncreaseReservation(2 * TEST_BUFFER_LEN)); + ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, + NULL, TOTAL_BYTES, NewProfile(), &client)); + ASSERT_TRUE(client.IncreaseReservation(2 * TEST_BUFFER_LEN)); ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1)); pool.Unpin(&client, &handle1); @@ -635,14 +601,11 @@ TEST_F(BufferPoolTest, EvictPageDifferentClient) { global_reservations_.InitRootTracker(NULL, TOTAL_BYTES); BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES); - ReservationTracker client_reservations[NUM_CLIENTS]; BufferPool::ClientHandle clients[NUM_CLIENTS]; for (int i = 0; i < NUM_CLIENTS; ++i) { - client_reservations[i].InitChildTracker( - NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN); - ASSERT_TRUE(client_reservations[i].IncreaseReservation(TEST_BUFFER_LEN)); - ASSERT_OK(pool.RegisterClient(Substitute("test client $0", i), - &client_reservations[i], NewFileGroup(), NewProfile(), &clients[i])); + ASSERT_OK(pool.RegisterClient(Substitute("test client $0", i), NewFileGroup(), + &global_reservations_, NULL, TEST_BUFFER_LEN, NewProfile(), &clients[i])); + ASSERT_TRUE(clients[i].IncreaseReservation(TEST_BUFFER_LEN)); } // Create a pinned and unpinned page for the first client. @@ -668,10 +631,7 @@ TEST_F(BufferPoolTest, EvictPageDifferentClient) { pool.DestroyPage(&clients[0], &handle1); pool.DestroyPage(&clients[0], &handle2); pool.FreeBuffer(&clients[1], &buffer); - for (int i = 0; i < NUM_CLIENTS; ++i) { - pool.DeregisterClient(&clients[i]); - client_reservations[i].Close(); - } + for (BufferPool::ClientHandle& client : clients) pool.DeregisterClient(&client); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/buffer-pool.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc index 7a026d6..842501f 100644 --- a/be/src/runtime/bufferpool/buffer-pool.cc +++ b/be/src/runtime/bufferpool/buffer-pool.cc @@ -24,7 +24,6 @@ #include "common/names.h" #include "gutil/strings/substitute.h" #include "runtime/bufferpool/buffer-allocator.h" -#include "runtime/bufferpool/reservation-tracker.h" #include "util/bit-util.h" #include "util/runtime-profile-counters.h" #include "util/uid-util.h" @@ -129,19 +128,19 @@ BufferPool::~BufferPool() { DCHECK_EQ(0, clean_pages_.size()); } -Status BufferPool::RegisterClient(const string& name, ReservationTracker* reservation, - TmpFileMgr::FileGroup* file_group, RuntimeProfile* profile, ClientHandle* client) { +Status BufferPool::RegisterClient(const string& name, TmpFileMgr::FileGroup* file_group, + ReservationTracker* parent_reservation, MemTracker* mem_tracker, + int64_t reservation_limit, RuntimeProfile* profile, ClientHandle* client) { DCHECK(!client->is_registered()); - DCHECK(reservation != NULL); - client->reservation_ = reservation; - client->impl_ = new Client(this, file_group, name, profile); + DCHECK(parent_reservation != NULL); + client->impl_ = new Client(this, file_group, name, parent_reservation, mem_tracker, + reservation_limit, profile); return Status::OK(); } void BufferPool::DeregisterClient(ClientHandle* client) { if (!client->is_registered()) return; - client->reservation_->Close(); // Will DCHECK if any remaining buffers or pinned pages. - client->reservation_ = NULL; + client->impl_->Close(); // Will DCHECK if any remaining buffers or pinned pages. delete client->impl_; // Will DCHECK if there are any remaining pages. client->impl_ = NULL; } @@ -190,7 +189,7 @@ Status BufferPool::Pin(ClientHandle* client, PageHandle* handle) { } // Update accounting last to avoid complicating the error return path above. ++page->pin_count; - client->reservation_->AllocateFrom(page->len); + client->impl_->reservation()->AllocateFrom(page->len); return Status::OK(); } @@ -201,10 +200,11 @@ void BufferPool::Unpin(ClientHandle* client, PageHandle* handle) { // If handle is pinned, we can assume that the page itself is pinned. DCHECK(handle->is_pinned()); Page* page = handle->page_; - client->reservation_->ReleaseTo(page->len); + ReservationTracker* reservation = client->impl_->reservation(); + reservation->ReleaseTo(page->len); if (--page->pin_count > 0) return; - client->impl_->MoveToDirtyUnpinned(client->reservation_->GetUnusedReservation(), page); + client->impl_->MoveToDirtyUnpinned(reservation->GetUnusedReservation(), page); COUNTER_ADD(client->impl_->counters().total_unpinned_bytes, handle->len()); COUNTER_ADD(client->impl_->counters().peak_unpinned_bytes, handle->len()); } @@ -225,8 +225,9 @@ void BufferPool::ExtractBuffer( Status BufferPool::AllocateBuffer( ClientHandle* client, int64_t len, BufferHandle* handle) { - RETURN_IF_ERROR(client->impl_->CleanPagesBeforeAllocation(client->reservation_, len)); - client->reservation_->AllocateFrom(len); + ReservationTracker* reservation = client->impl_->reservation(); + RETURN_IF_ERROR(client->impl_->CleanPagesBeforeAllocation(reservation, len)); + reservation->AllocateFrom(len); return AllocateBufferInternal(client, len, handle); } @@ -258,7 +259,7 @@ Status BufferPool::AllocateBufferInternal( void BufferPool::FreeBuffer(ClientHandle* client, BufferHandle* handle) { if (!handle->is_open()) return; // Should be idempotent. DCHECK_EQ(client, handle->client_); - client->reservation_->ReleaseTo(handle->len_); + client->impl_->reservation()->ReleaseTo(handle->len_); FreeBufferInternal(handle); } @@ -277,8 +278,8 @@ Status BufferPool::TransferBuffer(ClientHandle* src_client, BufferHandle* src, DCHECK_NE(src, dst); DCHECK_NE(src_client, dst_client); - dst_client->reservation_->AllocateFrom(src->len()); - src_client->reservation_->ReleaseTo(src->len()); + dst_client->impl_->reservation()->AllocateFrom(src->len()); + src_client->impl_->reservation()->ReleaseTo(src->len()); *dst = std::move(*src); dst->client_ = dst_client; return Status::OK(); @@ -348,14 +349,37 @@ Status BufferPool::EvictCleanPages(int64_t bytes_to_evict) { return Status::OK(); } +bool BufferPool::ClientHandle::IncreaseReservation(int64_t bytes) { + return impl_->reservation()->IncreaseReservation(bytes); +} + +bool BufferPool::ClientHandle::IncreaseReservationToFit(int64_t bytes) { + return impl_->reservation()->IncreaseReservationToFit(bytes); +} + +int64_t BufferPool::ClientHandle::GetReservation() const { + return impl_->reservation()->GetReservation(); +} + +int64_t BufferPool::ClientHandle::GetUsedReservation() const { + return impl_->reservation()->GetUsedReservation(); +} + +int64_t BufferPool::ClientHandle::GetUnusedReservation() const { + return impl_->reservation()->GetUnusedReservation(); +} + BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group, - const string& name, RuntimeProfile* profile) + const string& name, ReservationTracker* parent_reservation, MemTracker* mem_tracker, + int64_t reservation_limit, RuntimeProfile* profile) : pool_(pool), file_group_(file_group), name_(name), num_pages_(0), dirty_unpinned_bytes_(0), in_flight_write_bytes_(0) { + reservation_.InitChildTracker( + profile, parent_reservation, mem_tracker, reservation_limit); counters_.get_buffer_time = ADD_TIMER(profile, "BufferPoolGetBufferTime"); counters_.read_wait_time = ADD_TIMER(profile, "BufferPoolReadIoWaitTime"); counters_.read_io_ops = ADD_COUNTER(profile, "BufferPoolReadIoOps", TUnit::UNIT); @@ -476,8 +500,8 @@ Status BufferPool::Client::MoveEvictedToPinned( unique_lock<mutex>* client_lock, ClientHandle* client, PageHandle* handle) { Page* page = handle->page_; DCHECK(!page->buffer.is_open()); - RETURN_IF_ERROR( - CleanPagesBeforeAllocationLocked(client_lock, client->reservation_, page->len)); + RETURN_IF_ERROR(CleanPagesBeforeAllocationLocked( + client_lock, client->impl_->reservation(), page->len)); // Don't hold any locks while allocating or reading back the data. It is safe to modify // the page's buffer handle without holding any locks because no concurrent operations @@ -620,9 +644,9 @@ string BufferPool::Client::DebugString() { lock_guard<mutex> lock(lock_); stringstream ss; ss << Substitute("<BufferPool::Client> $0 name: $1 write_status: $2 num_pages: $3 " - "dirty_unpinned_bytes: $4 in_flight_write_bytes: $5", + "dirty_unpinned_bytes: $4 in_flight_write_bytes: $5 reservation: {$6}", this, name_, write_status_.GetDetail(), num_pages_, dirty_unpinned_bytes_, - in_flight_write_bytes_); + in_flight_write_bytes_, reservation_.DebugString()); ss << "\n " << pinned_pages_.size() << " pinned pages: "; pinned_pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1)); ss << "\n " << dirty_unpinned_pages_.size() << " dirty unpinned pages: "; @@ -634,9 +658,8 @@ string BufferPool::Client::DebugString() { string BufferPool::ClientHandle::DebugString() const { if (is_registered()) { - return Substitute("<BufferPool::Client> $0 reservation: {$1} " - "internal state: {$2}", - this, reservation_->DebugString(), impl_->DebugString()); + return Substitute( + "<BufferPool::Client> $0 internal state: {$1}", this, impl_->DebugString()); } else { return Substitute("<BufferPool::ClientHandle> $0 UNREGISTERED", this); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/buffer-pool.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h index 4e62b3b..e27e645 100644 --- a/be/src/runtime/bufferpool/buffer-pool.h +++ b/be/src/runtime/bufferpool/buffer-pool.h @@ -161,17 +161,23 @@ class BufferPool : public CacheLineAligned { /// Register a client. Returns an error status and does not register the client if the /// arguments are invalid. 'name' is an arbitrary name used to identify the client in - /// any errors messages or logging. Counters for this client are added to the (non-NULL) + /// any errors messages or logging. If 'file_group' is non-NULL, it is used to allocate + /// scratch space to write unpinned pages to disk. If it is NULL, unpinning of pages is + /// not allowed for this client. Counters for this client are added to the (non-NULL) /// 'profile'. 'client' is the client to register. 'client' should not already be - /// registered. If 'file_group' is non-NULL, it is used to allocate scratch space to - /// write unpinned pages to disk. If it is NULL, unpinning of pages is not allowed for - /// this client. - Status RegisterClient(const std::string& name, ReservationTracker* reservation, - TmpFileMgr::FileGroup* file_group, RuntimeProfile* profile, + /// registered. + /// + /// The client's reservation is created as a child of 'parent_reservation' with limit + /// 'reservation_limit' and associated with MemTracker 'mem_tracker'. The initial + /// reservation is 0 bytes. + Status RegisterClient(const std::string& name, TmpFileMgr::FileGroup* file_group, + ReservationTracker* parent_reservation, MemTracker* mem_tracker, + int64_t reservation_limit, RuntimeProfile* profile, ClientHandle* client) WARN_UNUSED_RESULT; /// Deregister 'client' if it is registered. All pages must be destroyed and buffers - /// must be freed for the client before calling this. Idempotent. + /// must be freed for the client before calling this. Releases any reservation that + /// belongs to the client. Idempotent. void DeregisterClient(ClientHandle* client); /// Create a new page of 'len' bytes with pin count 1. 'len' must be a page length @@ -316,12 +322,27 @@ class BufferPool : public CacheLineAligned { /// Client methods or BufferPool methods with the Client as an argument is not supported. class BufferPool::ClientHandle { public: - ClientHandle() : reservation_(NULL) {} + ClientHandle() : impl_(NULL) {} /// Client must be deregistered. ~ClientHandle() { DCHECK(!is_registered()); } - bool is_registered() const { return reservation_ != NULL; } - ReservationTracker* reservation() { return reservation_; } + /// Request to increase reservation for this client by 'bytes' by calling + /// ReservationTracker::IncreaseReservation(). Returns true if the reservation was + /// successfully increased. + bool IncreaseReservation(int64_t bytes) WARN_UNUSED_RESULT; + + /// Tries to ensure that 'bytes' of unused reservation is available for this client + /// to use by calling ReservationTracker::IncreaseReservationToFit(). Returns true + /// if successful, after which 'bytes' can be used. + bool IncreaseReservationToFit(int64_t bytes) WARN_UNUSED_RESULT; + + /// Accessors for this client's reservation corresponding to the identically-named + /// methods in ReservationTracker. + int64_t GetReservation() const; + int64_t GetUsedReservation() const; + int64_t GetUnusedReservation() const; + + bool is_registered() const { return impl_ != NULL; } std::string DebugString() const; @@ -329,11 +350,8 @@ class BufferPool::ClientHandle { friend class BufferPool; DISALLOW_COPY_AND_ASSIGN(ClientHandle); - /// The reservation tracker for the client. NULL means the client isn't registered. - /// All pages pinned by the client count as usage against 'reservation_'. - ReservationTracker* reservation_; - - /// Internal state for the client. Owned by BufferPool. + /// Internal state for the client. NULL means the client isn't registered. + /// Owned by BufferPool. Client* impl_; }; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/reservation-tracker.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/reservation-tracker.h b/be/src/runtime/bufferpool/reservation-tracker.h index 9edd37f..3d40fcc 100644 --- a/be/src/runtime/bufferpool/reservation-tracker.h +++ b/be/src/runtime/bufferpool/reservation-tracker.h @@ -108,6 +108,8 @@ class ReservationTracker { /// If the tracker is initialized, deregister the ReservationTracker from its parent, /// relinquishing all this tracker's reservation. All of the reservation must be unused /// and all the tracker's children must be closed before calling this method. + /// TODO: decide on and implement policy for how far to release the reservation up + /// the tree. Currently the reservation is released all the way to the root. void Close(); /// Request to increase reservation by 'bytes'. The request is either granted in http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/suballocator-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/suballocator-test.cc b/be/src/runtime/bufferpool/suballocator-test.cc index 01f0ea6..13bc597 100644 --- a/be/src/runtime/bufferpool/suballocator-test.cc +++ b/be/src/runtime/bufferpool/suballocator-test.cc @@ -17,6 +17,7 @@ #include <algorithm> #include <cstdlib> +#include <limits> #include <random> #include <string> #include <vector> @@ -85,11 +86,11 @@ class SuballocatorTest : public ::testing::Test { /// Register a client with 'buffer_pool_'. The client is automatically deregistered /// and freed at the end of the test. void RegisterClient( - ReservationTracker* reservation, BufferPool::ClientHandle** client) { + ReservationTracker* parent_reservation, BufferPool::ClientHandle** client) { clients_.push_back(make_unique<BufferPool::ClientHandle>()); *client = clients_.back().get(); - ASSERT_OK(buffer_pool_->RegisterClient( - "test client", reservation, NULL, profile(), *client)); + ASSERT_OK(buffer_pool_->RegisterClient("test client", NULL, parent_reservation, NULL, + numeric_limits<int64_t>::max(), profile(), *client)); } /// Assert that the memory for all of the suballocations is writable and disjoint by @@ -104,8 +105,8 @@ class SuballocatorTest : public ::testing::Test { allocs->clear(); } - static void ExpectReservationUnused(ReservationTracker& reservation) { - EXPECT_EQ(reservation.GetUsedReservation(), 0) << reservation.DebugString(); + static void ExpectReservationUnused(BufferPool::ClientHandle* client) { + EXPECT_EQ(client->GetUsedReservation(), 0) << client->DebugString(); } RuntimeProfile* profile() { return profile_.get(); } @@ -165,10 +166,10 @@ TEST_F(SuballocatorTest, SameSizeAllocations) { AssertMemoryValid(allocs); // Check that reservation usage matches the amount allocated. - EXPECT_EQ(global_reservation_.GetUsedReservation(), allocated_mem) + EXPECT_EQ(client->GetUsedReservation(), allocated_mem) << global_reservation_.DebugString(); FreeAllocations(&allocator, &allocs); - ExpectReservationUnused(global_reservation_); + ExpectReservationUnused(client); } /// Check behaviour of zero-length allocation. @@ -185,7 +186,7 @@ TEST_F(SuballocatorTest, ZeroLengthAllocation) { ASSERT_TRUE(alloc != nullptr) << global_reservation_.DebugString(); EXPECT_EQ(alloc->len(), Suballocator::MIN_ALLOCATION_BYTES); allocator.Free(move(alloc)); - ExpectReservationUnused(global_reservation_); + ExpectReservationUnused(client); } /// Check behaviour of out-of-range allocation. @@ -203,7 +204,7 @@ TEST_F(SuballocatorTest, OutOfRangeAllocations) { // Too-large allocations fail gracefully. ASSERT_FALSE(allocator.Allocate(Suballocator::MAX_ALLOCATION_BYTES + 1, &alloc).ok()) << global_reservation_.DebugString(); - ExpectReservationUnused(global_reservation_); + ExpectReservationUnused(client); } /// Basic test to make sure that non-power-of-two suballocations are handled as expected @@ -235,14 +236,13 @@ TEST_F(SuballocatorTest, NonPowerOfTwoAllocations) { // Check that it was rounded up to a power-of-two. EXPECT_EQ(alloc->len(), max(Suballocator::MIN_ALLOCATION_BYTES, BitUtil::RoundUpToPowerOfTwo(alloc_size))); - EXPECT_EQ( - max(TEST_BUFFER_LEN, alloc->len()), global_reservation_.GetUsedReservation()) + EXPECT_EQ(max(TEST_BUFFER_LEN, alloc->len()), client->GetUsedReservation()) << global_reservation_.DebugString(); memset(alloc->data(), 0, alloc->len()); // Check memory is writable. allocator.Free(move(alloc)); } - ExpectReservationUnused(global_reservation_); + ExpectReservationUnused(client); } /// Test that simulates hash table's patterns of doubling suballocations and validates @@ -292,12 +292,12 @@ TEST_F(SuballocatorTest, DoublingAllocations) { // coalesced two buddies of curr_alloc_size / 2) and one buffer with only // 'curr_alloc_size' bytes in use (if an Allocate() call couldn't recycle memory and // had to allocate a new buffer). - EXPECT_LE(global_reservation_.GetUsedReservation(), + EXPECT_LE(client->GetUsedReservation(), TEST_BUFFER_LEN + max(TEST_BUFFER_LEN, curr_alloc_size * NUM_ALLOCS)); } // Check that reservation usage behaves as expected. FreeAllocations(&allocator, &allocs); - ExpectReservationUnused(global_reservation_); + ExpectReservationUnused(client); } /// Do some randomised testing of the allocator. Simulate some interesting patterns with @@ -352,7 +352,7 @@ TEST_F(SuballocatorTest, RandomAllocations) { } // Check that memory is released when suballocations are freed. FreeAllocations(&allocator, &allocs); - ExpectReservationUnused(global_reservation_); + ExpectReservationUnused(client); } void SuballocatorTest::AssertMemoryValid( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/suballocator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/suballocator.cc b/be/src/runtime/bufferpool/suballocator.cc index a4835a4..a6ab1ff 100644 --- a/be/src/runtime/bufferpool/suballocator.cc +++ b/be/src/runtime/bufferpool/suballocator.cc @@ -89,7 +89,7 @@ int Suballocator::ComputeListIndex(int64_t bytes) const { Status Suballocator::AllocateBuffer(int64_t bytes, unique_ptr<Suballocation>* result) { DCHECK_LE(bytes, MAX_ALLOCATION_BYTES); const int64_t buffer_len = max(min_buffer_len_, BitUtil::RoundUpToPowerOfTwo(bytes)); - if (!client_->reservation()->IncreaseReservationToFit(buffer_len)) { + if (!client_->IncreaseReservationToFit(buffer_len)) { *result = nullptr; return Status::OK(); }
