IMPALA-3201: in-memory buffer pool implementation This patch implements basic in-memory buffer management, with reservations managed by ReservationTrackers.
Locks are fine-grained so that the buffer pool can scale to many concurrent queries. Includes basic tests for buffer pool setup, allocation and reservations. Change-Id: I4bda61c31cc02d26bc83c3d458c835b0984b86a0 Reviewed-on: http://gerrit.cloudera.org:8080/4070 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/241c7e01 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/241c7e01 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/241c7e01 Branch: refs/heads/master Commit: 241c7e01978f180012453b0a4ff6d061ca6d5093 Parents: 9cee2b5 Author: Tim Armstrong <[email protected]> Authored: Fri Aug 19 17:41:45 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Wed Sep 28 23:38:20 2016 +0000 ---------------------------------------------------------------------- be/src/bufferpool/CMakeLists.txt | 1 + be/src/bufferpool/buffer-allocator.cc | 39 +++ be/src/bufferpool/buffer-allocator.h | 48 ++++ be/src/bufferpool/buffer-pool-test.cc | 381 +++++++++++++++++++++++++++-- be/src/bufferpool/buffer-pool.cc | 361 ++++++++++++++++++++++++++- be/src/bufferpool/buffer-pool.h | 159 +++++++++--- be/src/util/internal-queue.h | 11 + common/thrift/generate_error_codes.py | 4 +- 8 files changed, 934 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/CMakeLists.txt b/be/src/bufferpool/CMakeLists.txt index 69c4e4a..2f056e0 100644 --- a/be/src/bufferpool/CMakeLists.txt +++ b/be/src/bufferpool/CMakeLists.txt @@ -22,6 +22,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/bufferpool") set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/bufferpool") add_library(BufferPool + buffer-allocator.cc buffer-pool.cc reservation-tracker.cc ) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-allocator.cc ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/buffer-allocator.cc b/be/src/bufferpool/buffer-allocator.cc new file mode 100644 index 0000000..27bd788 --- /dev/null +++ b/be/src/bufferpool/buffer-allocator.cc @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "bufferpool/buffer-allocator.h" + +#include "util/bit-util.h" + +namespace impala { + +BufferAllocator::BufferAllocator(int64_t min_buffer_len) + : min_buffer_len_(min_buffer_len) {} + +Status BufferAllocator::Allocate(int64_t len, uint8_t** buffer) { + DCHECK_GE(len, min_buffer_len_); + DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len)); + + *buffer = reinterpret_cast<uint8_t*>(malloc(len)); + if (*buffer == NULL) return Status(TErrorCode::BUFFER_ALLOCATION_FAILED, len); + return Status::OK(); +} + +void BufferAllocator::Free(uint8_t* buffer, int64_t len) { + free(buffer); +} +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-allocator.h ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/buffer-allocator.h b/be/src/bufferpool/buffer-allocator.h new file mode 100644 index 0000000..c3e0c70 --- /dev/null +++ b/be/src/bufferpool/buffer-allocator.h @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_BUFFER_ALLOCATOR_H +#define IMPALA_BUFFER_ALLOCATOR_H + +#include "common/status.h" + +namespace impala { + +/// The underlying memory allocator for the buffer pool. All buffers are allocated through +/// the BufferPool's BufferAllocator. The allocator only handles allocating buffers that +/// are power-of-two multiples of the minimum buffer length. +/// +/// TODO: +/// * Allocate memory with mmap() instead of malloc(). +/// * Implement free lists in the allocator or external to the allocator. +class BufferAllocator { + public: + BufferAllocator(int64_t min_buffer_len); + + /// Allocate memory for a buffer of 'len' bytes. 'len' must be a power-of-two multiple + /// of the minimum buffer length. + Status Allocate(int64_t len, uint8_t** buffer); + + /// Free the memory for a previously-allocated buffer. + void Free(uint8_t* buffer, int64_t len); + + private: + const int64_t min_buffer_len_; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-pool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/buffer-pool-test.cc b/be/src/bufferpool/buffer-pool-test.cc index cdb163d..d45f017 100644 --- a/be/src/bufferpool/buffer-pool-test.cc +++ b/be/src/bufferpool/buffer-pool-test.cc @@ -28,6 +28,7 @@ #include "bufferpool/reservation-tracker.h" #include "common/init.h" #include "common/object-pool.h" +#include "testutil/death-test-util.h" #include "testutil/test-macros.h" #include "common/names.h" @@ -51,13 +52,16 @@ class BufferPoolTest : public ::testing::Test { } /// The minimum buffer size used in most tests. - const static int64_t TEST_PAGE_LEN = 1024; + const static int64_t TEST_BUFFER_LEN = 1024; /// Test helper to simulate registering then deregistering a number of queries with /// the given initial reservation and reservation limit. void RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, int num_queries, int64_t initial_query_reservation, int64_t query_reservation_limit); + /// Create and destroy a page multiple times. + void CreatePageLoop(BufferPool* pool, ReservationTracker* parent_tracker, int num_ops); + protected: static int64_t QueryId(int hi, int lo) { return static_cast<int64_t>(hi) << 32 | lo; } @@ -85,7 +89,7 @@ class BufferPoolTest : public ::testing::Test { SpinLock query_reservations_lock_; }; -const int64_t BufferPoolTest::TEST_PAGE_LEN; +const int64_t BufferPoolTest::TEST_BUFFER_LEN; void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, int num_queries, int64_t initial_query_reservation, int64_t query_reservation_limit) { @@ -156,7 +160,7 @@ TEST_F(BufferPoolTest, BasicRegistration) { int64_t total_mem = sum_initial_reservations * num_concurrent_queries; global_reservations_.InitRootTracker(NewProfile(), total_mem); - BufferPool pool(TEST_PAGE_LEN, total_mem); + BufferPool pool(TEST_BUFFER_LEN, total_mem); RegisterQueriesAndClients( &pool, 0, num_concurrent_queries, sum_initial_reservations, reservation_limit); @@ -179,7 +183,7 @@ TEST_F(BufferPoolTest, ConcurrentRegistration) { int64_t total_mem = num_concurrent_queries * sum_initial_reservations; global_reservations_.InitRootTracker(NewProfile(), total_mem); - BufferPool pool(TEST_PAGE_LEN, total_mem); + BufferPool pool(TEST_BUFFER_LEN, total_mem); // Launch threads, each with a different set of query IDs. thread_group workers; @@ -195,32 +199,355 @@ TEST_F(BufferPoolTest, ConcurrentRegistration) { global_reservations_.Close(); } -/// Test that reservation setup fails if the initial buffers cannot be fulfilled. -TEST_F(BufferPoolTest, QueryReservationsUnfulfilled) { - Status status; - int num_queries = 128; - int64_t reservation_per_query = 128; - // Won't be able to fulfill initial reservation for last query. - int64_t total_mem = num_queries * reservation_per_query - 1; - global_reservations_.InitRootTracker(NewProfile(), total_mem); +/// Test basic page handle creation. +TEST_F(BufferPoolTest, PageCreation) { + // Allocate many pages, each a power-of-two multiple of the minimum page length. + int num_pages = 16; + int64_t max_page_len = TEST_BUFFER_LEN << (num_pages - 1); + int64_t total_mem = 2 * 2 * max_page_len; + global_reservations_.InitRootTracker(NULL, total_mem); + BufferPool pool(TEST_BUFFER_LEN, total_mem); + ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); + client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem); + ASSERT_TRUE(client_tracker->IncreaseReservation(total_mem)); + BufferPool::Client client; + ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client)); + + vector<BufferPool::PageHandle> handles(num_pages); + + // Create pages of various valid sizes. + for (int i = 0; i < num_pages; ++i) { + int size_multiple = 1 << i; + int64_t page_len = TEST_BUFFER_LEN * size_multiple; + int64_t used_before = client_tracker->GetUsedReservation(); + ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i])); + ASSERT_TRUE(handles[i].is_open()); + ASSERT_TRUE(handles[i].is_pinned()); + ASSERT_TRUE(handles[i].buffer_handle() != NULL); + ASSERT_TRUE(handles[i].data() != NULL); + ASSERT_EQ(handles[i].buffer_handle()->data(), handles[i].data()); + ASSERT_EQ(handles[i].len(), page_len); + ASSERT_EQ(handles[i].buffer_handle()->len(), page_len); + DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + page_len); + } - for (int i = 0; i < num_queries; ++i) { - ReservationTracker* query_tracker = GetQueryReservationTracker(i); - query_tracker->InitChildTracker( - NewProfile(), &global_reservations_, NULL, 2 * reservation_per_query); - bool got_initial_reservation = - query_tracker->IncreaseReservationToFit(reservation_per_query); - if (i < num_queries - 1) { - ASSERT_TRUE(got_initial_reservation); - } else { - ASSERT_FALSE(got_initial_reservation); - - // Getting the initial reservation should succeed after freeing up buffers from - // other query. - GetQueryReservationTracker(i - 1)->Close(); - ASSERT_TRUE(query_tracker->IncreaseReservationToFit(reservation_per_query)); + // Close the handles and check memory consumption. + for (int i = 0; i < num_pages; ++i) { + int64_t used_before = client_tracker->GetUsedReservation(); + int page_len = handles[i].len(); + pool.DestroyPage(&client, &handles[i]); + DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - page_len); + } + + pool.DeregisterClient(&client); + client_tracker->Close(); + + // All the reservations should be released at this point. + DCHECK_EQ(global_reservations_.GetReservation(), 0); + global_reservations_.Close(); +} + +TEST_F(BufferPoolTest, BufferAllocation) { + // Allocate many buffers, each a power-of-two multiple of the minimum buffer length. + int num_buffers = 16; + int64_t max_buffer_len = TEST_BUFFER_LEN << (num_buffers - 1); + int64_t total_mem = 2 * 2 * max_buffer_len; + global_reservations_.InitRootTracker(NULL, total_mem); + BufferPool pool(TEST_BUFFER_LEN, total_mem); + ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); + client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem); + ASSERT_TRUE(client_tracker->IncreaseReservationToFit(total_mem)); + BufferPool::Client client; + ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client)); + + vector<BufferPool::BufferHandle> handles(num_buffers); + + // Create buffers of various valid sizes. + for (int i = 0; i < num_buffers; ++i) { + int size_multiple = 1 << i; + int64_t buffer_len = TEST_BUFFER_LEN * size_multiple; + int64_t used_before = client_tracker->GetUsedReservation(); + ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i])); + ASSERT_TRUE(handles[i].is_open()); + ASSERT_TRUE(handles[i].data() != NULL); + ASSERT_EQ(handles[i].len(), buffer_len); + DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + buffer_len); + } + + // Close the handles and check memory consumption. + for (int i = 0; i < num_buffers; ++i) { + int64_t used_before = client_tracker->GetUsedReservation(); + int buffer_len = handles[i].len(); + pool.FreeBuffer(&client, &handles[i]); + DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - buffer_len); + } + + pool.DeregisterClient(&client); + client_tracker->Close(); + + // All the reservations should be released at this point. + DCHECK_EQ(global_reservations_.GetReservation(), 0); + global_reservations_.Close(); +} + +/// Test transfer of buffer handles between clients. +TEST_F(BufferPoolTest, BufferTransfer) { + // Each client needs to have enough reservation for a buffer. + const int num_clients = 5; + int64_t total_mem = num_clients * TEST_BUFFER_LEN; + global_reservations_.InitRootTracker(NULL, total_mem); + BufferPool pool(TEST_BUFFER_LEN, total_mem); + ReservationTracker client_trackers[num_clients]; + BufferPool::Client clients[num_clients]; + BufferPool::BufferHandle handles[num_clients]; + for (int i = 0; i < num_clients; ++i) { + client_trackers[i].InitChildTracker( + NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN); + ASSERT_TRUE(client_trackers[i].IncreaseReservationToFit(TEST_BUFFER_LEN)); + ASSERT_OK(pool.RegisterClient("test client", &client_trackers[i], &clients[i])); + } + + // Transfer the page around between the clients repeatedly in a circle. + ASSERT_OK(pool.AllocateBuffer(&clients[0], TEST_BUFFER_LEN, &handles[0])); + uint8_t* data = handles[0].data(); + for (int iter = 0; iter < 10; ++iter) { + for (int client = 0; client < num_clients; ++client) { + int next_client = (client + 1) % num_clients; + ASSERT_OK(pool.TransferBuffer(&clients[client], &handles[client], + &clients[next_client], &handles[next_client])); + // Check that the transfer left things in a consistent state. + ASSERT_FALSE(handles[client].is_open()); + ASSERT_EQ(0, client_trackers[client].GetUsedReservation()); + ASSERT_TRUE(handles[next_client].is_open()); + ASSERT_EQ(TEST_BUFFER_LEN, client_trackers[next_client].GetUsedReservation()); + // The same underlying buffer should be used. + ASSERT_EQ(data, handles[next_client].data()); } } + + pool.FreeBuffer(&clients[0], &handles[0]); + for (int i = 0; i < num_clients; ++i) { + pool.DeregisterClient(&clients[i]); + client_trackers[i].Close(); + } + DCHECK_EQ(global_reservations_.GetReservation(), 0); + global_reservations_.Close(); +} + +/// Test basic pinning and unpinning. +TEST_F(BufferPoolTest, Pin) { + int64_t total_mem = TEST_BUFFER_LEN * 1024; + // Set up client with enough reservation to pin twice. + int64_t child_reservation = TEST_BUFFER_LEN * 2; + BufferPool pool(TEST_BUFFER_LEN, total_mem); + global_reservations_.InitRootTracker(NULL, total_mem); + ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); + client_tracker->InitChildTracker( + NewProfile(), &global_reservations_, NULL, child_reservation); + ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation)); + BufferPool::Client client; + ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client)); + + BufferPool::PageHandle handle1, handle2; + + // Can pin two minimum sized pages. + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1)); + ASSERT_TRUE(handle1.is_open()); + ASSERT_TRUE(handle1.is_pinned()); + ASSERT_TRUE(handle1.data() != NULL); + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2)); + ASSERT_TRUE(handle2.is_open()); + ASSERT_TRUE(handle2.is_pinned()); + ASSERT_TRUE(handle2.data() != NULL); + + pool.Unpin(&client, &handle2); + ASSERT_FALSE(handle2.is_pinned()); + + // Can pin minimum-sized page twice. + ASSERT_OK(pool.Pin(&client, &handle1)); + ASSERT_TRUE(handle1.is_pinned()); + // Have to unpin twice. + pool.Unpin(&client, &handle1); + ASSERT_TRUE(handle1.is_pinned()); + pool.Unpin(&client, &handle1); + ASSERT_FALSE(handle1.is_pinned()); + + // Can pin double-sized page only once. + BufferPool::PageHandle double_handle; + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN * 2, &double_handle)); + ASSERT_TRUE(double_handle.is_open()); + ASSERT_TRUE(double_handle.is_pinned()); + ASSERT_TRUE(double_handle.data() != NULL); + + // Destroy the pages - test destroying both pinned and unpinned. + pool.DestroyPage(&client, &handle1); + pool.DestroyPage(&client, &handle2); + pool.DestroyPage(&client, &double_handle); + + pool.DeregisterClient(&client); + client_tracker->Close(); +} + +/// Creating a page or pinning without sufficient reservation should DCHECK. +TEST_F(BufferPoolTest, PinWithoutReservation) { + int64_t total_mem = TEST_BUFFER_LEN * 1024; + BufferPool pool(TEST_BUFFER_LEN, total_mem); + global_reservations_.InitRootTracker(NULL, total_mem); + ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); + client_tracker->InitChildTracker( + NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN); + BufferPool::Client client; + ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client)); + + BufferPool::PageHandle handle; + IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle), ""); + + // Should succeed after increasing reservation. + ASSERT_TRUE(client_tracker->IncreaseReservationToFit(TEST_BUFFER_LEN)); + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle)); + + // But we can't pin again. + IMPALA_ASSERT_DEBUG_DEATH(pool.Pin(&client, &handle), ""); + + pool.DestroyPage(&client, &handle); + pool.DeregisterClient(&client); + client_tracker->Close(); +} + +TEST_F(BufferPoolTest, ExtractBuffer) { + int64_t total_mem = TEST_BUFFER_LEN * 1024; + // Set up client with enough reservation for two buffers/pins. + int64_t child_reservation = TEST_BUFFER_LEN * 2; + BufferPool pool(TEST_BUFFER_LEN, total_mem); + global_reservations_.InitRootTracker(NULL, total_mem); + ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); + client_tracker->InitChildTracker( + NewProfile(), &global_reservations_, NULL, child_reservation); + ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation)); + BufferPool::Client client; + ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client)); + + BufferPool::PageHandle page; + BufferPool::BufferHandle buffer; + + // Test basic buffer extraction. + for (int len = TEST_BUFFER_LEN; len <= 2 * TEST_BUFFER_LEN; len *= 2) { + ASSERT_OK(pool.CreatePage(&client, len, &page)); + uint8_t* page_data = page.data(); + pool.ExtractBuffer(&client, &page, &buffer); + ASSERT_FALSE(page.is_open()); + ASSERT_TRUE(buffer.is_open()); + ASSERT_EQ(len, buffer.len()); + ASSERT_EQ(page_data, buffer.data()); + ASSERT_EQ(len, client_tracker->GetUsedReservation()); + pool.FreeBuffer(&client, &buffer); + ASSERT_EQ(0, client_tracker->GetUsedReservation()); + } + + // Test that ExtractBuffer() accounts correctly for pin count > 1. + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page)); + uint8_t* page_data = page.data(); + ASSERT_OK(pool.Pin(&client, &page)); + ASSERT_EQ(TEST_BUFFER_LEN * 2, client_tracker->GetUsedReservation()); + pool.ExtractBuffer(&client, &page, &buffer); + ASSERT_EQ(TEST_BUFFER_LEN, client_tracker->GetUsedReservation()); + ASSERT_FALSE(page.is_open()); + ASSERT_TRUE(buffer.is_open()); + ASSERT_EQ(TEST_BUFFER_LEN, buffer.len()); + ASSERT_EQ(page_data, buffer.data()); + pool.FreeBuffer(&client, &buffer); + ASSERT_EQ(0, client_tracker->GetUsedReservation()); + + // Test that ExtractBuffer() DCHECKs for unpinned pages. + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page)); + pool.Unpin(&client, &page); + IMPALA_ASSERT_DEBUG_DEATH(pool.ExtractBuffer(&client, &page, &buffer), ""); + pool.DestroyPage(&client, &page); + + pool.DeregisterClient(&client); + client_tracker->Close(); +} + +// Test concurrent creation and destruction of pages. +TEST_F(BufferPoolTest, ConcurrentPageCreation) { + int ops_per_thread = 1024; + int num_threads = 64; + // Need enough buffers for all initial reservations. + int total_mem = num_threads * TEST_BUFFER_LEN; + global_reservations_.InitRootTracker(NULL, total_mem); + + BufferPool pool(TEST_BUFFER_LEN, total_mem); + + // Launch threads, each with a different set of query IDs. + thread_group workers; + for (int i = 0; i < num_threads; ++i) { + workers.add_thread(new thread(bind(&BufferPoolTest::CreatePageLoop, this, &pool, + &global_reservations_, ops_per_thread))); + } + + // Build debug string to test concurrent iteration over pages_ list. + for (int i = 0; i < 64; ++i) { + LOG(INFO) << pool.DebugString(); + } + workers.join_all(); + + // All the reservations should be released at this point. + DCHECK_EQ(global_reservations_.GetChildReservations(), 0); + global_reservations_.Close(); +} + +void BufferPoolTest::CreatePageLoop( + BufferPool* pool, ReservationTracker* parent_tracker, int num_ops) { + ReservationTracker client_tracker; + client_tracker.InitChildTracker(NewProfile(), parent_tracker, NULL, TEST_BUFFER_LEN); + BufferPool::Client client; + ASSERT_OK(pool->RegisterClient("test client", &client_tracker, &client)); + for (int i = 0; i < num_ops; ++i) { + BufferPool::PageHandle handle; + ASSERT_TRUE(client_tracker.IncreaseReservation(TEST_BUFFER_LEN)); + ASSERT_OK(pool->CreatePage(&client, TEST_BUFFER_LEN, &handle)); + pool->Unpin(&client, &handle); + ASSERT_OK(pool->Pin(&client, &handle)); + pool->DestroyPage(&client, &handle); + client_tracker.DecreaseReservation(TEST_BUFFER_LEN); + } + pool->DeregisterClient(&client); + client_tracker.Close(); +} + +/// Test error path where pool is unable to fulfill a reservation because it cannot evict +/// unpinned pages. +TEST_F(BufferPoolTest, CapacityExhausted) { + global_reservations_.InitRootTracker(NULL, TEST_BUFFER_LEN); + // TODO: once we enable spilling, set up buffer pool so that spilling is disabled. + // Set up pool with one more buffer than reservations (so that we hit the reservation + // limit instead of the buffer limit). + BufferPool pool(TEST_BUFFER_LEN, TEST_BUFFER_LEN * 2); + + BufferPool::PageHandle handle1, handle2, handle3; + + BufferPool::Client client; + ASSERT_OK(pool.RegisterClient("test client", &global_reservations_, &client)); + ASSERT_TRUE(global_reservations_.IncreaseReservation(TEST_BUFFER_LEN)); + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1)); + + // Do not have enough reservations because we pinned the page. + IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2), ""); + + // Even with reservations, we can only create one more unpinned page because we don't + // support eviction yet. + pool.Unpin(&client, &handle1); + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2)); + pool.Unpin(&client, &handle2); + ASSERT_FALSE(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3).ok()); + + // After destroying a page, we should have a free buffer that we can use. + pool.DestroyPage(&client, &handle1); + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3)); + + pool.DestroyPage(&client, &handle2); + pool.DestroyPage(&client, &handle3); + pool.DeregisterClient(&client); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-pool.cc ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/buffer-pool.cc b/be/src/bufferpool/buffer-pool.cc index 21893fc..c89b8f6 100644 --- a/be/src/bufferpool/buffer-pool.cc +++ b/be/src/bufferpool/buffer-pool.cc @@ -31,13 +31,151 @@ using strings::Substitute; namespace impala { +/// The internal representation of a page, which can be pinned or unpinned. If the +/// page is pinned, a buffer is associated with the page. +/// +/// Code manipulating the page is responsible for acquiring 'lock' when reading or +/// modifying the page. +struct BufferPool::Page : public BufferPool::PageList::Node { + Page(int64_t len) : len(len), pin_count(0), dirty(false) {} + + /// Increment the pin count. Caller must hold 'lock'. + void IncrementPinCount(PageHandle* handle) { + lock.DCheckLocked(); + ++pin_count; + // Pinned page buffers may be modified by anyone with a pointer to the buffer, so we + // have to assume they are dirty. + dirty = true; + } + + /// Decrement the pin count. Caller must hold 'lock'. + void DecrementPinCount(PageHandle* handle) { + lock.DCheckLocked(); + DCHECK(pin_count > 0); + --pin_count; + } + + string DebugString() { + return Substitute("<BufferPool::Page> $0 len: $1 pin_count: $2 buf: $3 dirty: $4", this, + len, pin_count, buffer.DebugString(), dirty); + } + + // Helper for BufferPool::DebugString(). + static bool DebugStringCallback(stringstream* ss, BufferPool::Page* page) { + lock_guard<SpinLock> pl(page->lock); + (*ss) << page->DebugString() << "\n"; + return true; + } + + /// The length of the page in bytes. + const int64_t len; + + /// Lock to protect the below members of Page. The lock must be held when modifying any + /// of the below members and when reading any of the below members of an unpinned page. + SpinLock lock; + + /// The pin count of the page. + int pin_count; + + /// Buffer with the page's contents, Always open if pinned. Closed if page is unpinned + /// and was evicted from memory. + BufferHandle buffer; + + /// True if the buffer's contents need to be saved before evicting it from memory. + bool dirty; +}; + +BufferPool::BufferHandle::BufferHandle() { + Reset(); +} + +BufferPool::BufferHandle::BufferHandle(BufferHandle&& src) { + *this = std::move(src); +} + +BufferPool::BufferHandle& BufferPool::BufferHandle::operator=(BufferHandle&& src) { + DCHECK(!is_open()); + // Copy over all members then close src. + client_ = src.client_; + data_ = src.data_; + len_ = src.len_; + src.Reset(); + return *this; +} + +void BufferPool::BufferHandle::Open(const Client* client, uint8_t* data, int64_t len) { + client_ = client; + data_ = data; + len_ = len; +} + +void BufferPool::BufferHandle::Reset() { + client_ = NULL; + data_ = NULL; + len_ = -1; +} + +BufferPool::PageHandle::PageHandle() { + Reset(); +} + +BufferPool::PageHandle::PageHandle(PageHandle&& src) { + *this = std::move(src); +} + +BufferPool::PageHandle& BufferPool::PageHandle::operator=(PageHandle&& src) { + DCHECK(!is_open()); + // Copy over all members then close src. + page_ = src.page_; + client_ = src.client_; + src.Reset(); + return *this; +} + +void BufferPool::PageHandle::Open(Page* page, Client* client) { + DCHECK(!is_open()); + page->lock.DCheckLocked(); + page_ = page; + client_ = client; +} + +void BufferPool::PageHandle::Reset() { + page_ = NULL; + client_ = NULL; +} + +int BufferPool::PageHandle::pin_count() const { + DCHECK(is_open()); + // The pin count can only be modified via this PageHandle, which must not be + // concurrently accessed by multiple threads, so it is safe to access without locking + return page_->pin_count; +} + +int64_t BufferPool::PageHandle::len() const { + DCHECK(is_open()); + // The length of the page cannot change, so it is safe to access without locking. + return page_->len; +} + +const BufferPool::BufferHandle* BufferPool::PageHandle::buffer_handle() const { + DCHECK(is_pinned()); + // The 'buffer' field cannot change while the page is pinned, so it is safe to access + // without locking. + return &page_->buffer; +} + BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit) - : min_buffer_len_(min_buffer_len), buffer_bytes_limit_(buffer_bytes_limit) { + : allocator_(new BufferAllocator(min_buffer_len)), + min_buffer_len_(min_buffer_len), + buffer_bytes_limit_(buffer_bytes_limit), + buffer_bytes_remaining_(buffer_bytes_limit) { DCHECK_GT(min_buffer_len, 0); DCHECK_EQ(min_buffer_len, BitUtil::RoundUpToPowerOfTwo(min_buffer_len)); } -BufferPool::~BufferPool() {} +BufferPool::~BufferPool() { + DCHECK(pages_.empty()); +} Status BufferPool::RegisterClient( const string& name, ReservationTracker* reservation, Client* client) { @@ -55,15 +193,228 @@ void BufferPool::DeregisterClient(Client* client) { client->reservation_ = NULL; } +Status BufferPool::CreatePage(Client* client, int64_t len, PageHandle* handle) { + DCHECK(!handle->is_open()); + DCHECK_GE(len, min_buffer_len_); + DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len)); + + BufferHandle buffer; + // No changes have been made to state yet, so we can cleanly return on error. + RETURN_IF_ERROR(AllocateBufferInternal(client, len, &buffer)); + + Page* page = new Page(len); + { + lock_guard<SpinLock> pl(page->lock); + page->buffer = std::move(buffer); + handle->Open(page, client); + page->IncrementPinCount(handle); + } + + // Only add to globally-visible list after page is initialized. The page lock also + // needs to be released before enqueueing to respect the lock ordering. + pages_.Enqueue(page); + + client->reservation_->AllocateFrom(len); + return Status::OK(); +} + +void BufferPool::DestroyPage(Client* client, PageHandle* handle) { + if (!handle->is_open()) return; // DestroyPage() should be idempotent. + + Page* page = handle->page_; + if (handle->is_pinned()) { + // In the pinned case, delegate to ExtractBuffer() and FreeBuffer() to do the work + // of cleaning up the page and freeing the buffer. + BufferHandle buffer; + ExtractBuffer(client, handle, &buffer); + FreeBuffer(client, &buffer); + return; + } + + { + lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state. + // In the unpinned case, no reservation is consumed, so just free the buffer. + // TODO: wait for in-flight writes for 'page' so we can safely free 'page'. + if (page->buffer.is_open()) FreeBufferInternal(&page->buffer); + } + CleanUpPage(handle); +} + +void BufferPool::CleanUpPage(PageHandle* handle) { + // Remove the destroyed page from data structures in a way that ensures no other + // threads have a remaining reference. Threads that access pages via the 'pages_' + // list hold 'pages_.lock_', so Remove() will not return until those threads are done + // and it is safe to delete page. + pages_.Remove(handle->page_); + delete handle->page_; + handle->Reset(); +} + +Status BufferPool::Pin(Client* client, PageHandle* handle) { + DCHECK(client->is_registered()); + DCHECK(handle->is_open()); + DCHECK_EQ(handle->client_, client); + + Page* page = handle->page_; + { + lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state. + if (!page->buffer.is_open()) { + // No changes have been made to state yet, so we can cleanly return on error. + RETURN_IF_ERROR(AllocateBufferInternal(client, page->len, &page->buffer)); + } + page->IncrementPinCount(handle); + + // TODO: will need to initiate/wait for read if the page is not in-memory. + } + + client->reservation_->AllocateFrom(page->len); + return Status::OK(); +} + +void BufferPool::Unpin(Client* client, PageHandle* handle) { + DCHECK(handle->is_open()); + lock_guard<SpinLock> pl(handle->page_->lock); + UnpinLocked(client, handle); +} + +void BufferPool::UnpinLocked(Client* client, PageHandle* handle) { + DCHECK(client->is_registered()); + DCHECK_EQ(handle->client_, client); + // If handle is pinned, we can assume that the page itself is pinned. + DCHECK(handle->is_pinned()); + Page* page = handle->page_; + page->lock.DCheckLocked(); + + page->DecrementPinCount(handle); + client->reservation_->ReleaseTo(page->len); + + // TODO: can evict now. Only need to preserve contents if 'page->dirty' is true. +} + +void BufferPool::ExtractBuffer( + Client* client, PageHandle* page_handle, BufferHandle* buffer_handle) { + DCHECK(page_handle->is_pinned()); + DCHECK_EQ(page_handle->client_, client); + + Page* page = page_handle->page_; + { + lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state. + // TODO: wait for in-flight writes for 'page' so we can safely free 'page'. + + // Bring the pin count to 1 so that we're not using surplus reservations. + while (page->pin_count > 1) UnpinLocked(client, page_handle); + *buffer_handle = std::move(page->buffer); + } + CleanUpPage(page_handle); +} + +Status BufferPool::AllocateBuffer(Client* client, int64_t len, BufferHandle* handle) { + client->reservation_->AllocateFrom(len); + return AllocateBufferInternal(client, len, handle); +} + +Status BufferPool::AllocateBufferInternal( + Client* client, int64_t len, BufferHandle* buffer) { + DCHECK(!buffer->is_open()); + DCHECK_GE(len, min_buffer_len_); + DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len)); + + // If there is headroom in 'buffer_bytes_remaining_', we can just allocate a new buffer. + if (TryDecreaseBufferBytesRemaining(len)) { + uint8_t* data; + Status status = allocator_->Allocate(len, &data); + if (!status.ok()) { + buffer_bytes_remaining_.Add(len); + return status; + } + DCHECK(data != NULL); + buffer->Open(client, data, len); + return Status::OK(); + } + + // If there is no remaining capacity, we must evict another page. + return Status(TErrorCode::NOT_IMPLEMENTED_ERROR, + Substitute("Buffer bytes limit $0 of buffer pool is exhausted and page eviction is " + "not implemented yet!", buffer_bytes_limit_)); +} + +void BufferPool::FreeBuffer(Client* client, BufferHandle* handle) { + if (!handle->is_open()) return; // Should be idempotent. + DCHECK_EQ(client, handle->client_); + client->reservation_->ReleaseTo(handle->len_); + FreeBufferInternal(handle); +} + +void BufferPool::FreeBufferInternal(BufferHandle* handle) { + DCHECK(handle->is_open()); + allocator_->Free(handle->data(), handle->len()); + buffer_bytes_remaining_.Add(handle->len()); + handle->Reset(); +} + +Status BufferPool::TransferBuffer( + Client* src_client, BufferHandle* src, Client* dst_client, BufferHandle* dst) { + DCHECK(src->is_open()); + DCHECK(!dst->is_open()); + DCHECK_EQ(src_client, src->client_); + DCHECK_NE(src, dst); + DCHECK_NE(src_client, dst_client); + + dst_client->reservation_->AllocateFrom(src->len()); + src_client->reservation_->ReleaseTo(src->len()); + *dst = std::move(*src); + dst->client_ = dst_client; + return Status::OK(); +} + +bool BufferPool::TryDecreaseBufferBytesRemaining(int64_t len) { + // TODO: we may want to change this policy so that we don't always use up to the limit + // for buffers, since this may starve other operators using non-buffer-pool memory. + while (true) { + int64_t old_value = buffer_bytes_remaining_.Load(); + if (old_value < len) return false; + int64_t new_value = old_value - len; + if (buffer_bytes_remaining_.CompareAndSwap(old_value, new_value)) { + return true; + } + } +} + string BufferPool::Client::DebugString() const { - return Substitute("<BufferPool::Client> $0 name: $1 reservation: $2", this, name_, - reservation_->DebugString()); + if (is_registered()) { + return Substitute("<BufferPool::Client> $0 name: $1 reservation: {$2}", this, name_, + reservation_->DebugString()); + } else { + return Substitute("<BufferPool::Client> $0 UNREGISTERED", this); + } +} + +string BufferPool::PageHandle::DebugString() const { + if (is_open()) { + lock_guard<SpinLock> pl(page_->lock); + return Substitute( + "<BufferPool::PageHandle> $0 client: {$1} page: {$2}", + this, client_->DebugString(), page_->DebugString()); + } else { + return Substitute("<BufferPool::PageHandle> $0 CLOSED", this); + } +} + +string BufferPool::BufferHandle::DebugString() const { + if (is_open()) { + return Substitute("<BufferPool::BufferHandle> $0 client: {$1} data: $2 len: $3", this, + client_->DebugString(), data_, len_); + } else { + return Substitute("<BufferPool::BufferHandle> $0 CLOSED", this); + } } string BufferPool::DebugString() { stringstream ss; ss << "<BufferPool> " << this << " min_buffer_len: " << min_buffer_len_ - << "buffer_bytes_limit: " << buffer_bytes_limit_ << "\n"; + << " buffer_bytes_limit: " << buffer_bytes_limit_ + << " buffer_bytes_remaining: " << buffer_bytes_remaining_.Load() << "\n"; + pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1)); return ss.str(); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-pool.h ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/buffer-pool.h b/be/src/bufferpool/buffer-pool.h index e566543..44b5574 100644 --- a/be/src/bufferpool/buffer-pool.h +++ b/be/src/bufferpool/buffer-pool.h @@ -18,10 +18,12 @@ #ifndef IMPALA_BUFFER_POOL_H #define IMPALA_BUFFER_POOL_H +#include <stdint.h> +#include <boost/scoped_ptr.hpp> #include <boost/thread/locks.hpp> #include <string> -#include <stdint.h> +#include "bufferpool/buffer-allocator.h" #include "common/atomic.h" #include "common/status.h" #include "gutil/macros.h" @@ -30,6 +32,7 @@ namespace impala { +class BufferAllocator; class ReservationTracker; /// A buffer pool that manages memory buffers for all queries in an Impala daemon. @@ -69,7 +72,7 @@ class ReservationTracker; /// Buffer/Page Sizes /// ================= /// The buffer pool has a minimum buffer size, which must be a power-of-two. Page and -/// buffer sizes must be an exact multiple of the minimum buffer size. +/// buffer sizes must be an exact power-of-two multiple of the minimum buffer size. /// /// Reservations /// ============ @@ -140,11 +143,18 @@ class ReservationTracker; /// +========================+ /// | IMPLEMENTATION DETAILS | /// +========================+ -/// ... TODO ... +/// +/// Lock Ordering +/// ============= +/// The lock ordering is: +/// * pages_::lock_ -> Page::lock_ +/// +/// If a reference to a page is acquired via the pages_ list, pages_::lock_ must be held +/// until done with the page to ensure the page isn't concurrently deleted. class BufferPool { public: - class Client; class BufferHandle; + class Client; class PageHandle; /// Constructs a new buffer pool. @@ -198,9 +208,10 @@ class BufferPool { /// Extracts buffer from a pinned page. After this returns, the page referenced by /// 'page_handle' will be destroyed and 'buffer_handle' will reference the buffer from - /// 'page_handle'. This may decrease reservation usage if the page was pinned multiple - /// times via 'page_handle'. - void ExtractBuffer(PageHandle* page_handle, BufferHandle* buffer_handle); + /// 'page_handle'. This may decrease reservation usage of 'client' if the page was + /// pinned multiple times via 'page_handle'. + void ExtractBuffer( + Client* client, PageHandle* page_handle, BufferHandle* buffer_handle); /// Allocates a new buffer of 'len' bytes. Uses reservation from 'client'. The caller /// is responsible for ensuring it has enough unused reservation before calling @@ -214,9 +225,9 @@ class BufferPool { /// Transfer ownership of buffer from 'src_client' to 'dst_client' and move the /// handle from 'src' to 'dst'. Increases reservation usage in 'dst_client' and - /// decreases reservation usage in 'src_client'. 'src' must be open and 'dst' must - /// be closed - /// before calling. After a successful call, 'src' is closed and 'dst' is open. + /// decreases reservation usage in 'src_client'. 'src' must be open and 'dst' must be + /// closed before calling. 'src'/'dst' and 'src_client'/'dst_client' must be different. + /// After a successful call, 'src' is closed and 'dst' is open. Status TransferBuffer(Client* src_client, BufferHandle* src, Client* dst_client, BufferHandle* dst); @@ -228,13 +239,49 @@ class BufferPool { private: DISALLOW_COPY_AND_ASSIGN(BufferPool); + struct Page; + + /// Same as Unpin(), except the lock for the page referenced by 'handle' must be held + /// by the caller. + void UnpinLocked(Client* client, PageHandle* handle); + + /// Perform the cleanup of the page object and handle when the page is destroyed. + /// Reset 'handle', free the Page object and remove the 'pages_' entry. + /// The 'handle->page_' lock should *not* be held by the caller. + void CleanUpPage(PageHandle* handle); - /// The minimum length of a buffer in bytes. All buffers and pages are a multiple of - /// this length. This is always a power of two. + /// Allocate a buffer of length 'len'. Assumes that the client's reservation has already + /// been consumed for the buffer. Returns an error if the pool is unable to fulfill the + /// reservation. + Status AllocateBufferInternal(Client* client, int64_t len, BufferHandle* buffer); + + /// Frees 'buffer', which must be open before calling. Closes 'buffer' and updates + /// internal state but does not release to any reservation. + void FreeBufferInternal(BufferHandle* buffer); + + /// Check if we can allocate another buffer of size 'len' bytes without + /// 'buffer_bytes_remaining_' going negative. + /// Returns true and decrease 'buffer_bytes_remaining_' by 'len' if successful. + bool TryDecreaseBufferBytesRemaining(int64_t len); + + /// Allocator for allocating and freeing all buffer memory. + boost::scoped_ptr<BufferAllocator> allocator_; + + /// The minimum length of a buffer in bytes. All buffers and pages are a power-of-two + /// multiple of this length. This is always a power of two. const int64_t min_buffer_len_; /// The maximum physical memory in bytes that can be used for buffers. const int64_t buffer_bytes_limit_; + + /// The remaining number of bytes of 'buffer_bytes_limit_' that can be used for + /// allocating new buffers. Must be updated atomically before a new buffer is + /// allocated or after an existing buffer is freed. + AtomicInt64 buffer_bytes_remaining_; + + /// List containing all pages. Protected by the list's internal lock. + typedef InternalQueue<Page> PageList; + PageList pages_; }; /// External representation of a client of the BufferPool. Clients are used for @@ -266,6 +313,54 @@ class BufferPool::Client { ReservationTracker* reservation_; }; +/// A handle to a buffer allocated from the buffer pool. Each BufferHandle should only +/// be used by a single thread at a time: concurrently calling BufferHandle methods or +/// BufferPool methods with the BufferHandle as an argument is not supported. +class BufferPool::BufferHandle { + public: + BufferHandle(); + ~BufferHandle() { DCHECK(!is_open()); } + + /// Allow move construction of handles, to support std::move(). + BufferHandle(BufferHandle&& src); + + /// Allow move assignment of handles, to support STL classes like std::vector. + /// Destination must be uninitialized. + BufferHandle& operator=(BufferHandle&& src); + + bool is_open() const { return data_ != NULL; } + int64_t len() const { + DCHECK(is_open()); + return len_; + } + /// Get a pointer to the start of the buffer. + uint8_t* data() const { + DCHECK(is_open()); + return data_; + } + + std::string DebugString() const; + + private: + DISALLOW_COPY_AND_ASSIGN(BufferHandle); + friend class BufferPool; + + /// Internal helper to set the handle to an opened state. + void Open(const Client* client, uint8_t* data, int64_t len); + + /// Internal helper to reset the handle to an unopened state. + void Reset(); + + /// The client the buffer handle belongs to, used to validate that the correct client + /// is provided in BufferPool method calls. + const Client* client_; + + /// Pointer to the start of the buffer. Non-NULL if open, NULL if closed. + uint8_t* data_; + + /// Length of the buffer in bytes. + int64_t len_; +}; /// The handle for a page used by clients of the BufferPool. Each PageHandle should /// only be used by a single thread at a time: concurrently calling PageHandle methods @@ -282,12 +377,13 @@ class BufferPool::PageHandle { // Destination must be closed. PageHandle& operator=(PageHandle&& src); - bool is_open() const; - bool is_pinned() const; + bool is_open() const { return page_ != NULL; } + bool is_pinned() const { return pin_count() > 0; } + int pin_count() const; int64_t len() const; /// Get a pointer to the start of the page's buffer. Only valid to call if the page /// is pinned via this handle. - uint8_t* data() const; + uint8_t* data() const { return buffer_handle()->data(); } /// Return a pointer to the page's buffer handle. Only valid to call if the page is /// pinned via this handle. Only const accessors of the returned handle can be used: @@ -299,32 +395,21 @@ class BufferPool::PageHandle { private: DISALLOW_COPY_AND_ASSIGN(PageHandle); -}; - -/// A handle to a buffer allocated from the buffer pool. Each BufferHandle should only -/// be used by a single thread at a time: concurrently calling BufferHandle methods or -/// BufferPool methods with the BufferHandle as an argument is not supported. -class BufferPool::BufferHandle { - public: - BufferHandle(); - ~BufferHandle() { DCHECK(!is_open()); } + friend class BufferPool; + friend class Page; - /// Allow move construction of handles, to support std::move(). - BufferHandle(BufferHandle&& src); + /// Internal helper to open the handle for the given page. + void Open(Page* page, Client* client); - /// Allow move assignment of handles, to support STL classes like std::vector. - /// Destination must be uninitialized. - BufferHandle& operator=(BufferHandle&& src); + /// Internal helper to reset the handle to an unopened state. + void Reset(); - bool is_open() const; - int64_t len() const; - /// Get a pointer to the start of the buffer. - uint8_t* data() const; + /// The internal page structure. NULL if the handle is not open. + Page* page_; - std::string DebugString() const; - - private: - DISALLOW_COPY_AND_ASSIGN(BufferHandle); + /// The client the page handle belongs to, used to validate that the correct client + /// is being used. + const Client* client_; }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/util/internal-queue.h ---------------------------------------------------------------------- diff --git a/be/src/util/internal-queue.h b/be/src/util/internal-queue.h index 08365d7..37a9a0c 100644 --- a/be/src/util/internal-queue.h +++ b/be/src/util/internal-queue.h @@ -19,6 +19,7 @@ #ifndef IMPALA_UTIL_INTERNAL_QUEUE_H #define IMPALA_UTIL_INTERNAL_QUEUE_H +#include <boost/function.hpp> #include <boost/thread/locks.hpp> #include "util/spinlock.h" @@ -231,6 +232,16 @@ class InternalQueue { return true; } + // Iterate over elements of queue, calling 'fn' for each element. If 'fn' returns + // false, terminate iteration. It is invalid to call other InternalQueue methods + // from 'fn'. + void Iterate(boost::function<bool(T*)> fn) { + boost::lock_guard<SpinLock> lock(lock_); + for (Node* current = head_; current != NULL; current = current->next) { + if (!fn(reinterpret_cast<T*>(current))) return; + } + } + /// Prints the queue ptrs to a string. std::string DebugString() { std::stringstream ss; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/common/thrift/generate_error_codes.py ---------------------------------------------------------------------- diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index 3d48005..2554a18 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -283,7 +283,9 @@ error_codes = ( "supported length of 2147483647 bytes."), ("SCRATCH_LIMIT_EXCEEDED", 91, "Scratch space limit of $0 bytes exceeded for query " - "while spilling data to disk.") + "while spilling data to disk."), + + ("BUFFER_ALLOCATION_FAILED", 92, "Unexpected error allocating $0 byte buffer."), ) import sys
