IMPALA-3200: move bufferpool under runtime It is arguably a subcomponent of the runtime system
Change-Id: I3507066fb5cb955f0872d31a4619bd8bb7d647cd Reviewed-on: http://gerrit.cloudera.org:8080/5165 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ac44d6c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ac44d6c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ac44d6c3 Branch: refs/heads/master Commit: ac44d6c3ef0f71452cc47f7a61659e1b939110c3 Parents: eb03186 Author: Tim Armstrong <[email protected]> Authored: Mon Nov 21 11:34:09 2016 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Nov 22 07:31:34 2016 +0000 ---------------------------------------------------------------------- .clang-tidy | 1 - be/CMakeLists.txt | 4 +- be/src/bufferpool/CMakeLists.txt | 32 -- be/src/bufferpool/buffer-allocator.cc | 39 -- be/src/bufferpool/buffer-allocator.h | 48 -- be/src/bufferpool/buffer-pool-counters.h | 47 -- be/src/bufferpool/buffer-pool-test.cc | 554 ------------------- be/src/bufferpool/buffer-pool.cc | 439 --------------- be/src/bufferpool/buffer-pool.h | 426 -------------- .../bufferpool/reservation-tracker-counters.h | 41 -- be/src/bufferpool/reservation-tracker-test.cc | 378 ------------- be/src/bufferpool/reservation-tracker.cc | 306 ---------- be/src/bufferpool/reservation-tracker.h | 248 --------- be/src/runtime/CMakeLists.txt | 1 + be/src/runtime/bufferpool/CMakeLists.txt | 32 ++ be/src/runtime/bufferpool/buffer-allocator.cc | 39 ++ be/src/runtime/bufferpool/buffer-allocator.h | 48 ++ .../runtime/bufferpool/buffer-pool-counters.h | 47 ++ be/src/runtime/bufferpool/buffer-pool-test.cc | 554 +++++++++++++++++++ be/src/runtime/bufferpool/buffer-pool.cc | 439 +++++++++++++++ be/src/runtime/bufferpool/buffer-pool.h | 426 ++++++++++++++ .../bufferpool/reservation-tracker-counters.h | 41 ++ .../bufferpool/reservation-tracker-test.cc | 378 +++++++++++++ .../runtime/bufferpool/reservation-tracker.cc | 306 ++++++++++ be/src/runtime/bufferpool/reservation-tracker.h | 248 +++++++++ be/src/runtime/mem-tracker.cc | 2 +- 26 files changed, 2561 insertions(+), 2563 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/.clang-tidy ---------------------------------------------------------------------- diff --git a/.clang-tidy b/.clang-tidy index 5a2bf70..fcf5922 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -71,7 +71,6 @@ Checks: "-*,clang*,\ HeaderFilterRegex: "be/src/\ (benchmarks\ -|bufferpool\ |catalog\ |codegen\ |common\ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 435ab19..6a076f8 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -181,7 +181,6 @@ if (DOXYGEN_FOUND) # Possible to not input the subdirs one by one? set(CMAKE_DOXYGEN_INPUT ${CMAKE_SOURCE_DIR}/be/src - ${CMAKE_SOURCE_DIR}/be/src/bufferpool/ ${CMAKE_SOURCE_DIR}/be/src/catalog/ ${CMAKE_SOURCE_DIR}/be/src/common/ ${CMAKE_SOURCE_DIR}/be/src/exec/ @@ -390,7 +389,7 @@ add_custom_target(be-test) # Utility CMake function to make specifying tests and benchmarks less verbose FUNCTION(ADD_BE_TEST TEST_NAME) # This gets the directory where the test is from (e.g. 'exprs' or 'runtime') - get_filename_component(DIR_NAME ${CMAKE_CURRENT_SOURCE_DIR} NAME) + file(RELATIVE_PATH DIR_NAME "${CMAKE_SOURCE_DIR}/be/src/" ${CMAKE_CURRENT_SOURCE_DIR}) ADD_EXECUTABLE(${TEST_NAME} ${TEST_NAME}.cc) TARGET_LINK_LIBRARIES(${TEST_NAME} ${IMPALA_TEST_LINK_LIBS}) set(CMAKE_EXE_LINKER_FLAGS "--start-group") @@ -434,7 +433,6 @@ endfunction(COMPILE_TO_IR) add_subdirectory(src/gutil) # compile these subdirs using their own CMakeLists.txt -add_subdirectory(src/bufferpool) add_subdirectory(src/catalog) add_subdirectory(src/codegen) add_subdirectory(src/common) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/CMakeLists.txt b/be/src/bufferpool/CMakeLists.txt deleted file mode 100644 index 2f056e0..0000000 --- a/be/src/bufferpool/CMakeLists.txt +++ /dev/null @@ -1,32 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# where to put generated libraries -set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/bufferpool") - -# where to put generated binaries -set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/bufferpool") - -add_library(BufferPool - buffer-allocator.cc - buffer-pool.cc - reservation-tracker.cc -) -add_dependencies(BufferPool thrift-deps) - -ADD_BE_TEST(buffer-pool-test) -ADD_BE_TEST(reservation-tracker-test) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/buffer-allocator.cc ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/buffer-allocator.cc b/be/src/bufferpool/buffer-allocator.cc deleted file mode 100644 index 27bd788..0000000 --- a/be/src/bufferpool/buffer-allocator.cc +++ /dev/null @@ -1,39 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "bufferpool/buffer-allocator.h" - -#include "util/bit-util.h" - -namespace impala { - -BufferAllocator::BufferAllocator(int64_t min_buffer_len) - : min_buffer_len_(min_buffer_len) {} - -Status BufferAllocator::Allocate(int64_t len, uint8_t** buffer) { - DCHECK_GE(len, min_buffer_len_); - DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len)); - - *buffer = reinterpret_cast<uint8_t*>(malloc(len)); - if (*buffer == NULL) return Status(TErrorCode::BUFFER_ALLOCATION_FAILED, len); - return Status::OK(); -} - -void BufferAllocator::Free(uint8_t* buffer, int64_t len) { - free(buffer); -} -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/buffer-allocator.h ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/buffer-allocator.h b/be/src/bufferpool/buffer-allocator.h deleted file mode 100644 index c3e0c70..0000000 --- a/be/src/bufferpool/buffer-allocator.h +++ /dev/null @@ -1,48 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_BUFFER_ALLOCATOR_H -#define IMPALA_BUFFER_ALLOCATOR_H - -#include "common/status.h" - -namespace impala { - -/// The underlying memory allocator for the buffer pool. All buffers are allocated through -/// the BufferPool's BufferAllocator. The allocator only handles allocating buffers that -/// are power-of-two multiples of the minimum buffer length. -/// -/// TODO: -/// * Allocate memory with mmap() instead of malloc(). -/// * Implement free lists in the allocator or external to the allocator. -class BufferAllocator { - public: - BufferAllocator(int64_t min_buffer_len); - - /// Allocate memory for a buffer of 'len' bytes. 'len' must be a power-of-two multiple - /// of the minimum buffer length. - Status Allocate(int64_t len, uint8_t** buffer); - - /// Free the memory for a previously-allocated buffer. - void Free(uint8_t* buffer, int64_t len); - - private: - const int64_t min_buffer_len_; -}; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/buffer-pool-counters.h ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/buffer-pool-counters.h b/be/src/bufferpool/buffer-pool-counters.h deleted file mode 100644 index 6f3801e..0000000 --- a/be/src/bufferpool/buffer-pool-counters.h +++ /dev/null @@ -1,47 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_BUFFER_POOL_COUNTERS_H -#define IMPALA_BUFFER_POOL_COUNTERS_H - -#include "util/runtime-profile.h" - -namespace impala { - -/// A set of counters for each buffer pool client. -struct BufferPoolClientCounters { - public: - /// Amount of time spent trying to get a buffer. - RuntimeProfile::Counter* get_buffer_time; - - /// Amount of time spent waiting for reads from disk to complete. - RuntimeProfile::Counter* read_wait_time; - - /// Amount of time spent waiting for writes to disk to complete. - RuntimeProfile::Counter* write_wait_time; - - /// The peak total size of unpinned buffers. - RuntimeProfile::HighWaterMarkCounter* peak_unpinned_bytes; - - /// The total bytes of data unpinned. Every time a page's pin count goes from 1 to 0, - /// this counter is incremented by the page size. - RuntimeProfile::Counter* total_unpinned_bytes; -}; - -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/buffer-pool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/buffer-pool-test.cc b/be/src/bufferpool/buffer-pool-test.cc deleted file mode 100644 index 793bcb9..0000000 --- a/be/src/bufferpool/buffer-pool-test.cc +++ /dev/null @@ -1,554 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <boost/bind.hpp> -#include <boost/scoped_ptr.hpp> -#include <boost/thread/thread.hpp> -#include <boost/unordered_map.hpp> -#include <cstdlib> -#include <string> -#include <vector> - -#include "bufferpool/buffer-pool.h" -#include "bufferpool/reservation-tracker.h" -#include "common/init.h" -#include "common/object-pool.h" -#include "testutil/death-test-util.h" -#include "testutil/gtest-util.h" - -#include "common/names.h" - -namespace impala { - -class BufferPoolTest : public ::testing::Test { - public: - virtual void SetUp() {} - - virtual void TearDown() { - for (auto entry : query_reservations_) { - ReservationTracker* tracker = entry.second; - tracker->Close(); - } - - global_reservations_.Close(); - obj_pool_.Clear(); - } - - /// The minimum buffer size used in most tests. - const static int64_t TEST_BUFFER_LEN = 1024; - - /// Test helper to simulate registering then deregistering a number of queries with - /// the given initial reservation and reservation limit. - void RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, int num_queries, - int64_t initial_query_reservation, int64_t query_reservation_limit); - - /// Create and destroy a page multiple times. - void CreatePageLoop(BufferPool* pool, ReservationTracker* parent_tracker, int num_ops); - - protected: - static int64_t QueryId(int hi, int lo) { return static_cast<int64_t>(hi) << 32 | lo; } - - /// Helper function to create one reservation tracker per query. - ReservationTracker* GetQueryReservationTracker(int64_t query_id) { - lock_guard<SpinLock> l(query_reservations_lock_); - ReservationTracker* tracker = query_reservations_[query_id]; - if (tracker != NULL) return tracker; - tracker = obj_pool_.Add(new ReservationTracker()); - query_reservations_[query_id] = tracker; - return tracker; - } - - RuntimeProfile* NewProfile() { - return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile")); - } - - ObjectPool obj_pool_; - - ReservationTracker global_reservations_; - - // Map from query_id to the reservation tracker for that query. Reads and modifications - // of the map are protected by query_reservations_lock_. - unordered_map<int64_t, ReservationTracker*> query_reservations_; - SpinLock query_reservations_lock_; -}; - -const int64_t BufferPoolTest::TEST_BUFFER_LEN; - -void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, - int num_queries, int64_t initial_query_reservation, int64_t query_reservation_limit) { - Status status; - - int clients_per_query = 32; - BufferPool::Client* clients[num_queries]; - ReservationTracker* client_reservations[num_queries]; - - for (int i = 0; i < num_queries; ++i) { - int64_t query_id = QueryId(query_id_hi, i); - - // Initialize a tracker for a new query. - ReservationTracker* query_reservation = GetQueryReservationTracker(query_id); - query_reservation->InitChildTracker( - NULL, &global_reservations_, NULL, query_reservation_limit); - - // Test that closing then reopening child tracker works. - query_reservation->Close(); - query_reservation->InitChildTracker( - NULL, &global_reservations_, NULL, query_reservation_limit); - EXPECT_TRUE(query_reservation->IncreaseReservationToFit(initial_query_reservation)); - - clients[i] = new BufferPool::Client[clients_per_query]; - client_reservations[i] = new ReservationTracker[clients_per_query]; - - for (int j = 0; j < clients_per_query; ++j) { - int64_t initial_client_reservation = - initial_query_reservation / clients_per_query + j - < initial_query_reservation % clients_per_query; - // Reservation limit can be anything greater or equal to the initial reservation. - int64_t client_reservation_limit = initial_client_reservation + rand() % 100000; - client_reservations[i][j].InitChildTracker( - NULL, query_reservation, NULL, client_reservation_limit); - EXPECT_TRUE( - client_reservations[i][j].IncreaseReservationToFit(initial_client_reservation)); - string name = Substitute("Client $0 for query $1", j, query_id); - EXPECT_OK(pool->RegisterClient( - name, &client_reservations[i][j], NewProfile(), &clients[i][j])); - } - - for (int j = 0; j < clients_per_query; ++j) { - ASSERT_TRUE(clients[i][j].is_registered()); - } - } - - // Deregister clients then query. - for (int i = 0; i < num_queries; ++i) { - for (int j = 0; j < clients_per_query; ++j) { - pool->DeregisterClient(&clients[i][j]); - ASSERT_FALSE(clients[i][j].is_registered()); - client_reservations[i][j].Close(); - } - - delete[] clients[i]; - delete[] client_reservations[i]; - - GetQueryReservationTracker(QueryId(query_id_hi, i))->Close(); - } -} - -/// Test that queries and clients can be registered and deregistered with the reservation -/// trackers and the buffer pool. -TEST_F(BufferPoolTest, BasicRegistration) { - int num_concurrent_queries = 1024; - int64_t sum_initial_reservations = 4; - int64_t reservation_limit = 1024; - // Need enough buffers for all initial reservations. - int64_t total_mem = sum_initial_reservations * num_concurrent_queries; - global_reservations_.InitRootTracker(NewProfile(), total_mem); - - BufferPool pool(TEST_BUFFER_LEN, total_mem); - - RegisterQueriesAndClients( - &pool, 0, num_concurrent_queries, sum_initial_reservations, reservation_limit); - - DCHECK_EQ(global_reservations_.GetUsedReservation(), 0); - DCHECK_EQ(global_reservations_.GetChildReservations(), 0); - DCHECK_EQ(global_reservations_.GetReservation(), 0); - global_reservations_.Close(); -} - -/// Test that queries and clients can be registered and deregistered by concurrent -/// threads. -TEST_F(BufferPoolTest, ConcurrentRegistration) { - int queries_per_thread = 64; - int num_threads = 64; - int num_concurrent_queries = queries_per_thread * num_threads; - int64_t sum_initial_reservations = 4; - int64_t reservation_limit = 1024; - // Need enough buffers for all initial reservations. - int64_t total_mem = num_concurrent_queries * sum_initial_reservations; - global_reservations_.InitRootTracker(NewProfile(), total_mem); - - BufferPool pool(TEST_BUFFER_LEN, total_mem); - - // Launch threads, each with a different set of query IDs. - thread_group workers; - for (int i = 0; i < num_threads; ++i) { - workers.add_thread(new thread(bind(&BufferPoolTest::RegisterQueriesAndClients, this, - &pool, i, queries_per_thread, sum_initial_reservations, reservation_limit))); - } - workers.join_all(); - - // All the reservations should be released at this point. - DCHECK_EQ(global_reservations_.GetUsedReservation(), 0); - DCHECK_EQ(global_reservations_.GetReservation(), 0); - global_reservations_.Close(); -} - -/// Test basic page handle creation. -TEST_F(BufferPoolTest, PageCreation) { - // Allocate many pages, each a power-of-two multiple of the minimum page length. - int num_pages = 16; - int64_t max_page_len = TEST_BUFFER_LEN << (num_pages - 1); - int64_t total_mem = 2 * 2 * max_page_len; - global_reservations_.InitRootTracker(NULL, total_mem); - BufferPool pool(TEST_BUFFER_LEN, total_mem); - ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); - client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem); - ASSERT_TRUE(client_tracker->IncreaseReservation(total_mem)); - BufferPool::Client client; - ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client)); - - vector<BufferPool::PageHandle> handles(num_pages); - - // Create pages of various valid sizes. - for (int i = 0; i < num_pages; ++i) { - int size_multiple = 1 << i; - int64_t page_len = TEST_BUFFER_LEN * size_multiple; - int64_t used_before = client_tracker->GetUsedReservation(); - ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i])); - ASSERT_TRUE(handles[i].is_open()); - ASSERT_TRUE(handles[i].is_pinned()); - ASSERT_TRUE(handles[i].buffer_handle() != NULL); - ASSERT_TRUE(handles[i].data() != NULL); - ASSERT_EQ(handles[i].buffer_handle()->data(), handles[i].data()); - ASSERT_EQ(handles[i].len(), page_len); - ASSERT_EQ(handles[i].buffer_handle()->len(), page_len); - DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + page_len); - } - - // Close the handles and check memory consumption. - for (int i = 0; i < num_pages; ++i) { - int64_t used_before = client_tracker->GetUsedReservation(); - int page_len = handles[i].len(); - pool.DestroyPage(&client, &handles[i]); - DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - page_len); - } - - pool.DeregisterClient(&client); - client_tracker->Close(); - - // All the reservations should be released at this point. - DCHECK_EQ(global_reservations_.GetReservation(), 0); - global_reservations_.Close(); -} - -TEST_F(BufferPoolTest, BufferAllocation) { - // Allocate many buffers, each a power-of-two multiple of the minimum buffer length. - int num_buffers = 16; - int64_t max_buffer_len = TEST_BUFFER_LEN << (num_buffers - 1); - int64_t total_mem = 2 * 2 * max_buffer_len; - global_reservations_.InitRootTracker(NULL, total_mem); - BufferPool pool(TEST_BUFFER_LEN, total_mem); - ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); - client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem); - ASSERT_TRUE(client_tracker->IncreaseReservationToFit(total_mem)); - BufferPool::Client client; - ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client)); - - vector<BufferPool::BufferHandle> handles(num_buffers); - - // Create buffers of various valid sizes. - for (int i = 0; i < num_buffers; ++i) { - int size_multiple = 1 << i; - int64_t buffer_len = TEST_BUFFER_LEN * size_multiple; - int64_t used_before = client_tracker->GetUsedReservation(); - ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i])); - ASSERT_TRUE(handles[i].is_open()); - ASSERT_TRUE(handles[i].data() != NULL); - ASSERT_EQ(handles[i].len(), buffer_len); - DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + buffer_len); - } - - // Close the handles and check memory consumption. - for (int i = 0; i < num_buffers; ++i) { - int64_t used_before = client_tracker->GetUsedReservation(); - int buffer_len = handles[i].len(); - pool.FreeBuffer(&client, &handles[i]); - DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - buffer_len); - } - - pool.DeregisterClient(&client); - client_tracker->Close(); - - // All the reservations should be released at this point. - DCHECK_EQ(global_reservations_.GetReservation(), 0); - global_reservations_.Close(); -} - -/// Test transfer of buffer handles between clients. -TEST_F(BufferPoolTest, BufferTransfer) { - // Each client needs to have enough reservation for a buffer. - const int num_clients = 5; - int64_t total_mem = num_clients * TEST_BUFFER_LEN; - global_reservations_.InitRootTracker(NULL, total_mem); - BufferPool pool(TEST_BUFFER_LEN, total_mem); - ReservationTracker client_trackers[num_clients]; - BufferPool::Client clients[num_clients]; - BufferPool::BufferHandle handles[num_clients]; - for (int i = 0; i < num_clients; ++i) { - client_trackers[i].InitChildTracker( - NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN); - ASSERT_TRUE(client_trackers[i].IncreaseReservationToFit(TEST_BUFFER_LEN)); - ASSERT_OK(pool.RegisterClient( - "test client", &client_trackers[i], NewProfile(), &clients[i])); - } - - // Transfer the page around between the clients repeatedly in a circle. - ASSERT_OK(pool.AllocateBuffer(&clients[0], TEST_BUFFER_LEN, &handles[0])); - uint8_t* data = handles[0].data(); - for (int iter = 0; iter < 10; ++iter) { - for (int client = 0; client < num_clients; ++client) { - int next_client = (client + 1) % num_clients; - ASSERT_OK(pool.TransferBuffer(&clients[client], &handles[client], - &clients[next_client], &handles[next_client])); - // Check that the transfer left things in a consistent state. - ASSERT_FALSE(handles[client].is_open()); - ASSERT_EQ(0, client_trackers[client].GetUsedReservation()); - ASSERT_TRUE(handles[next_client].is_open()); - ASSERT_EQ(TEST_BUFFER_LEN, client_trackers[next_client].GetUsedReservation()); - // The same underlying buffer should be used. - ASSERT_EQ(data, handles[next_client].data()); - } - } - - pool.FreeBuffer(&clients[0], &handles[0]); - for (int i = 0; i < num_clients; ++i) { - pool.DeregisterClient(&clients[i]); - client_trackers[i].Close(); - } - DCHECK_EQ(global_reservations_.GetReservation(), 0); - global_reservations_.Close(); -} - -/// Test basic pinning and unpinning. -TEST_F(BufferPoolTest, Pin) { - int64_t total_mem = TEST_BUFFER_LEN * 1024; - // Set up client with enough reservation to pin twice. - int64_t child_reservation = TEST_BUFFER_LEN * 2; - BufferPool pool(TEST_BUFFER_LEN, total_mem); - global_reservations_.InitRootTracker(NULL, total_mem); - ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); - client_tracker->InitChildTracker( - NewProfile(), &global_reservations_, NULL, child_reservation); - ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation)); - BufferPool::Client client; - ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client)); - - BufferPool::PageHandle handle1, handle2; - - // Can pin two minimum sized pages. - ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1)); - ASSERT_TRUE(handle1.is_open()); - ASSERT_TRUE(handle1.is_pinned()); - ASSERT_TRUE(handle1.data() != NULL); - ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2)); - ASSERT_TRUE(handle2.is_open()); - ASSERT_TRUE(handle2.is_pinned()); - ASSERT_TRUE(handle2.data() != NULL); - - pool.Unpin(&client, &handle2); - ASSERT_FALSE(handle2.is_pinned()); - - // Can pin minimum-sized page twice. - ASSERT_OK(pool.Pin(&client, &handle1)); - ASSERT_TRUE(handle1.is_pinned()); - // Have to unpin twice. - pool.Unpin(&client, &handle1); - ASSERT_TRUE(handle1.is_pinned()); - pool.Unpin(&client, &handle1); - ASSERT_FALSE(handle1.is_pinned()); - - // Can pin double-sized page only once. - BufferPool::PageHandle double_handle; - ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN * 2, &double_handle)); - ASSERT_TRUE(double_handle.is_open()); - ASSERT_TRUE(double_handle.is_pinned()); - ASSERT_TRUE(double_handle.data() != NULL); - - // Destroy the pages - test destroying both pinned and unpinned. - pool.DestroyPage(&client, &handle1); - pool.DestroyPage(&client, &handle2); - pool.DestroyPage(&client, &double_handle); - - pool.DeregisterClient(&client); - client_tracker->Close(); -} - -/// Creating a page or pinning without sufficient reservation should DCHECK. -TEST_F(BufferPoolTest, PinWithoutReservation) { - int64_t total_mem = TEST_BUFFER_LEN * 1024; - BufferPool pool(TEST_BUFFER_LEN, total_mem); - global_reservations_.InitRootTracker(NULL, total_mem); - ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); - client_tracker->InitChildTracker( - NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN); - BufferPool::Client client; - ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client)); - - BufferPool::PageHandle handle; - IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle), ""); - - // Should succeed after increasing reservation. - ASSERT_TRUE(client_tracker->IncreaseReservationToFit(TEST_BUFFER_LEN)); - ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle)); - - // But we can't pin again. - IMPALA_ASSERT_DEBUG_DEATH(pool.Pin(&client, &handle), ""); - - pool.DestroyPage(&client, &handle); - pool.DeregisterClient(&client); - client_tracker->Close(); -} - -TEST_F(BufferPoolTest, ExtractBuffer) { - int64_t total_mem = TEST_BUFFER_LEN * 1024; - // Set up client with enough reservation for two buffers/pins. - int64_t child_reservation = TEST_BUFFER_LEN * 2; - BufferPool pool(TEST_BUFFER_LEN, total_mem); - global_reservations_.InitRootTracker(NULL, total_mem); - ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); - client_tracker->InitChildTracker( - NewProfile(), &global_reservations_, NULL, child_reservation); - ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation)); - BufferPool::Client client; - ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client)); - - BufferPool::PageHandle page; - BufferPool::BufferHandle buffer; - - // Test basic buffer extraction. - for (int len = TEST_BUFFER_LEN; len <= 2 * TEST_BUFFER_LEN; len *= 2) { - ASSERT_OK(pool.CreatePage(&client, len, &page)); - uint8_t* page_data = page.data(); - pool.ExtractBuffer(&client, &page, &buffer); - ASSERT_FALSE(page.is_open()); - ASSERT_TRUE(buffer.is_open()); - ASSERT_EQ(len, buffer.len()); - ASSERT_EQ(page_data, buffer.data()); - ASSERT_EQ(len, client_tracker->GetUsedReservation()); - pool.FreeBuffer(&client, &buffer); - ASSERT_EQ(0, client_tracker->GetUsedReservation()); - } - - // Test that ExtractBuffer() accounts correctly for pin count > 1. - ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page)); - uint8_t* page_data = page.data(); - ASSERT_OK(pool.Pin(&client, &page)); - ASSERT_EQ(TEST_BUFFER_LEN * 2, client_tracker->GetUsedReservation()); - pool.ExtractBuffer(&client, &page, &buffer); - ASSERT_EQ(TEST_BUFFER_LEN, client_tracker->GetUsedReservation()); - ASSERT_FALSE(page.is_open()); - ASSERT_TRUE(buffer.is_open()); - ASSERT_EQ(TEST_BUFFER_LEN, buffer.len()); - ASSERT_EQ(page_data, buffer.data()); - pool.FreeBuffer(&client, &buffer); - ASSERT_EQ(0, client_tracker->GetUsedReservation()); - - // Test that ExtractBuffer() DCHECKs for unpinned pages. - ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page)); - pool.Unpin(&client, &page); - IMPALA_ASSERT_DEBUG_DEATH(pool.ExtractBuffer(&client, &page, &buffer), ""); - pool.DestroyPage(&client, &page); - - pool.DeregisterClient(&client); - client_tracker->Close(); -} - -// Test concurrent creation and destruction of pages. -TEST_F(BufferPoolTest, ConcurrentPageCreation) { - int ops_per_thread = 1024; - int num_threads = 64; - // Need enough buffers for all initial reservations. - int total_mem = num_threads * TEST_BUFFER_LEN; - global_reservations_.InitRootTracker(NULL, total_mem); - - BufferPool pool(TEST_BUFFER_LEN, total_mem); - - // Launch threads, each with a different set of query IDs. - thread_group workers; - for (int i = 0; i < num_threads; ++i) { - workers.add_thread(new thread(bind(&BufferPoolTest::CreatePageLoop, this, &pool, - &global_reservations_, ops_per_thread))); - } - - // Build debug string to test concurrent iteration over pages_ list. - for (int i = 0; i < 64; ++i) { - LOG(INFO) << pool.DebugString(); - } - workers.join_all(); - - // All the reservations should be released at this point. - DCHECK_EQ(global_reservations_.GetChildReservations(), 0); - global_reservations_.Close(); -} - -void BufferPoolTest::CreatePageLoop( - BufferPool* pool, ReservationTracker* parent_tracker, int num_ops) { - ReservationTracker client_tracker; - client_tracker.InitChildTracker(NewProfile(), parent_tracker, NULL, TEST_BUFFER_LEN); - BufferPool::Client client; - ASSERT_OK(pool->RegisterClient("test client", &client_tracker, NewProfile(), &client)); - for (int i = 0; i < num_ops; ++i) { - BufferPool::PageHandle handle; - ASSERT_TRUE(client_tracker.IncreaseReservation(TEST_BUFFER_LEN)); - ASSERT_OK(pool->CreatePage(&client, TEST_BUFFER_LEN, &handle)); - pool->Unpin(&client, &handle); - ASSERT_OK(pool->Pin(&client, &handle)); - pool->DestroyPage(&client, &handle); - client_tracker.DecreaseReservation(TEST_BUFFER_LEN); - } - pool->DeregisterClient(&client); - client_tracker.Close(); -} - -/// Test error path where pool is unable to fulfill a reservation because it cannot evict -/// unpinned pages. -TEST_F(BufferPoolTest, CapacityExhausted) { - global_reservations_.InitRootTracker(NULL, TEST_BUFFER_LEN); - // TODO: once we enable spilling, set up buffer pool so that spilling is disabled. - // Set up pool with one more buffer than reservations (so that we hit the reservation - // limit instead of the buffer limit). - BufferPool pool(TEST_BUFFER_LEN, TEST_BUFFER_LEN * 2); - - BufferPool::PageHandle handle1, handle2, handle3; - - BufferPool::Client client; - ASSERT_OK( - pool.RegisterClient("test client", &global_reservations_, NewProfile(), &client)); - ASSERT_TRUE(global_reservations_.IncreaseReservation(TEST_BUFFER_LEN)); - ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1)); - - // Do not have enough reservations because we pinned the page. - IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2), ""); - - // Even with reservations, we can only create one more unpinned page because we don't - // support eviction yet. - pool.Unpin(&client, &handle1); - ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2)); - pool.Unpin(&client, &handle2); - ASSERT_FALSE(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3).ok()); - - // After destroying a page, we should have a free buffer that we can use. - pool.DestroyPage(&client, &handle1); - ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3)); - - pool.DestroyPage(&client, &handle2); - pool.DestroyPage(&client, &handle3); - pool.DeregisterClient(&client); -} -} - -IMPALA_TEST_MAIN(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/buffer-pool.cc ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/buffer-pool.cc b/be/src/bufferpool/buffer-pool.cc deleted file mode 100644 index 3035694..0000000 --- a/be/src/bufferpool/buffer-pool.cc +++ /dev/null @@ -1,439 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "bufferpool/buffer-pool.h" - -#include <boost/bind.hpp> -#include <limits> -#include <sstream> - -#include "bufferpool/reservation-tracker.h" -#include "common/names.h" -#include "gutil/strings/substitute.h" -#include "util/bit-util.h" -#include "util/runtime-profile-counters.h" -#include "util/uid-util.h" - -namespace impala { - -/// The internal representation of a page, which can be pinned or unpinned. If the -/// page is pinned, a buffer is associated with the page. -/// -/// Code manipulating the page is responsible for acquiring 'lock' when reading or -/// modifying the page. -struct BufferPool::Page : public BufferPool::PageList::Node { - Page(int64_t len) : len(len), pin_count(0), dirty(false) {} - - /// Increment the pin count. Caller must hold 'lock'. - void IncrementPinCount(PageHandle* handle) { - lock.DCheckLocked(); - ++pin_count; - // Pinned page buffers may be modified by anyone with a pointer to the buffer, so we - // have to assume they are dirty. - dirty = true; - } - - /// Decrement the pin count. Caller must hold 'lock'. - void DecrementPinCount(PageHandle* handle) { - lock.DCheckLocked(); - DCHECK(pin_count > 0); - --pin_count; - } - - string DebugString() { - return Substitute("<BufferPool::Page> $0 len: $1 pin_count: $2 buf: $3 dirty: $4", this, - len, pin_count, buffer.DebugString(), dirty); - } - - // Helper for BufferPool::DebugString(). - static bool DebugStringCallback(stringstream* ss, BufferPool::Page* page) { - lock_guard<SpinLock> pl(page->lock); - (*ss) << page->DebugString() << "\n"; - return true; - } - - /// The length of the page in bytes. - const int64_t len; - - /// Lock to protect the below members of Page. The lock must be held when modifying any - /// of the below members and when reading any of the below members of an unpinned page. - SpinLock lock; - - /// The pin count of the page. - int pin_count; - - /// Buffer with the page's contents, Always open if pinned. Closed if page is unpinned - /// and was evicted from memory. - BufferHandle buffer; - - /// True if the buffer's contents need to be saved before evicting it from memory. - bool dirty; -}; - -BufferPool::BufferHandle::BufferHandle() { - Reset(); -} - -BufferPool::BufferHandle::BufferHandle(BufferHandle&& src) { - *this = std::move(src); -} - -BufferPool::BufferHandle& BufferPool::BufferHandle::operator=(BufferHandle&& src) { - DCHECK(!is_open()); - // Copy over all members then close src. - client_ = src.client_; - data_ = src.data_; - len_ = src.len_; - src.Reset(); - return *this; -} - -void BufferPool::BufferHandle::Open(const Client* client, uint8_t* data, int64_t len) { - client_ = client; - data_ = data; - len_ = len; -} - -void BufferPool::BufferHandle::Reset() { - client_ = NULL; - data_ = NULL; - len_ = -1; -} - -BufferPool::PageHandle::PageHandle() { - Reset(); -} - -BufferPool::PageHandle::PageHandle(PageHandle&& src) { - *this = std::move(src); -} - -BufferPool::PageHandle& BufferPool::PageHandle::operator=(PageHandle&& src) { - DCHECK(!is_open()); - // Copy over all members then close src. - page_ = src.page_; - client_ = src.client_; - src.Reset(); - return *this; -} - -void BufferPool::PageHandle::Open(Page* page, Client* client) { - DCHECK(!is_open()); - page->lock.DCheckLocked(); - page_ = page; - client_ = client; -} - -void BufferPool::PageHandle::Reset() { - page_ = NULL; - client_ = NULL; -} - -int BufferPool::PageHandle::pin_count() const { - DCHECK(is_open()); - // The pin count can only be modified via this PageHandle, which must not be - // concurrently accessed by multiple threads, so it is safe to access without locking - return page_->pin_count; -} - -int64_t BufferPool::PageHandle::len() const { - DCHECK(is_open()); - // The length of the page cannot change, so it is safe to access without locking. - return page_->len; -} - -const BufferPool::BufferHandle* BufferPool::PageHandle::buffer_handle() const { - DCHECK(is_pinned()); - // The 'buffer' field cannot change while the page is pinned, so it is safe to access - // without locking. - return &page_->buffer; -} - -BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit) - : allocator_(new BufferAllocator(min_buffer_len)), - min_buffer_len_(min_buffer_len), - buffer_bytes_limit_(buffer_bytes_limit), - buffer_bytes_remaining_(buffer_bytes_limit) { - DCHECK_GT(min_buffer_len, 0); - DCHECK_EQ(min_buffer_len, BitUtil::RoundUpToPowerOfTwo(min_buffer_len)); -} - -BufferPool::~BufferPool() { - DCHECK(pages_.empty()); -} - -Status BufferPool::RegisterClient( - const string& name, ReservationTracker* reservation, RuntimeProfile* profile, - Client* client) { - DCHECK(!client->is_registered()); - DCHECK(reservation != NULL); - client->InitCounters(profile); - client->reservation_ = reservation; - client->name_ = name; - return Status::OK(); -} - -void BufferPool::Client::InitCounters(RuntimeProfile* profile) { - counters_.get_buffer_time = ADD_TIMER(profile, "BufferPoolGetBufferTime"); - counters_.read_wait_time = ADD_TIMER(profile, "BufferPoolReadWaitTime"); - counters_.write_wait_time = ADD_TIMER(profile, "BufferPoolWriteWaitTime"); - counters_.peak_unpinned_bytes = - profile->AddHighWaterMarkCounter("BufferPoolPeakUnpinnedBytes", TUnit::BYTES); - counters_.total_unpinned_bytes = - ADD_COUNTER(profile, "BufferPoolTotalUnpinnedBytes", TUnit::BYTES); -} - -void BufferPool::DeregisterClient(Client* client) { - if (!client->is_registered()) return; - client->reservation_->Close(); - client->name_.clear(); - client->reservation_ = NULL; -} - -Status BufferPool::CreatePage(Client* client, int64_t len, PageHandle* handle) { - DCHECK(!handle->is_open()); - DCHECK_GE(len, min_buffer_len_); - DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len)); - - BufferHandle buffer; - // No changes have been made to state yet, so we can cleanly return on error. - RETURN_IF_ERROR(AllocateBufferInternal(client, len, &buffer)); - - Page* page = new Page(len); - { - lock_guard<SpinLock> pl(page->lock); - page->buffer = std::move(buffer); - handle->Open(page, client); - page->IncrementPinCount(handle); - } - - // Only add to globally-visible list after page is initialized. The page lock also - // needs to be released before enqueueing to respect the lock ordering. - pages_.Enqueue(page); - - client->reservation_->AllocateFrom(len); - return Status::OK(); -} - -void BufferPool::DestroyPage(Client* client, PageHandle* handle) { - if (!handle->is_open()) return; // DestroyPage() should be idempotent. - - Page* page = handle->page_; - if (handle->is_pinned()) { - // In the pinned case, delegate to ExtractBuffer() and FreeBuffer() to do the work - // of cleaning up the page and freeing the buffer. - BufferHandle buffer; - ExtractBuffer(client, handle, &buffer); - FreeBuffer(client, &buffer); - return; - } - - { - lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state. - // In the unpinned case, no reservation is consumed, so just free the buffer. - // TODO: wait for in-flight writes for 'page' so we can safely free 'page'. - if (page->buffer.is_open()) FreeBufferInternal(&page->buffer); - } - CleanUpPage(handle); -} - -void BufferPool::CleanUpPage(PageHandle* handle) { - // Remove the destroyed page from data structures in a way that ensures no other - // threads have a remaining reference. Threads that access pages via the 'pages_' - // list hold 'pages_.lock_', so Remove() will not return until those threads are done - // and it is safe to delete page. - pages_.Remove(handle->page_); - delete handle->page_; - handle->Reset(); -} - -Status BufferPool::Pin(Client* client, PageHandle* handle) { - DCHECK(client->is_registered()); - DCHECK(handle->is_open()); - DCHECK_EQ(handle->client_, client); - - Page* page = handle->page_; - { - lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state. - if (page->pin_count == 0) { - if (!page->buffer.is_open()) { - // No changes have been made to state yet, so we can cleanly return on error. - RETURN_IF_ERROR(AllocateBufferInternal(client, page->len, &page->buffer)); - - // TODO: will need to initiate/wait for read if the page is not in-memory. - } - COUNTER_ADD(client->counters_.peak_unpinned_bytes, -handle->len()); - } - page->IncrementPinCount(handle); - } - - client->reservation_->AllocateFrom(page->len); - return Status::OK(); -} - -void BufferPool::Unpin(Client* client, PageHandle* handle) { - DCHECK(handle->is_open()); - lock_guard<SpinLock> pl(handle->page_->lock); - UnpinLocked(client, handle); -} - -void BufferPool::UnpinLocked(Client* client, PageHandle* handle) { - DCHECK(client->is_registered()); - DCHECK_EQ(handle->client_, client); - // If handle is pinned, we can assume that the page itself is pinned. - DCHECK(handle->is_pinned()); - Page* page = handle->page_; - page->lock.DCheckLocked(); - - page->DecrementPinCount(handle); - client->reservation_->ReleaseTo(page->len); - - COUNTER_ADD(client->counters_.total_unpinned_bytes, handle->len()); - COUNTER_ADD(client->counters_.peak_unpinned_bytes, handle->len()); - - // TODO: can evict now. Only need to preserve contents if 'page->dirty' is true. -} - -void BufferPool::ExtractBuffer( - Client* client, PageHandle* page_handle, BufferHandle* buffer_handle) { - DCHECK(page_handle->is_pinned()); - - DCHECK_EQ(page_handle->client_, client); - - Page* page = page_handle->page_; - { - lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state. - // TODO: wait for in-flight writes for 'page' so we can safely free 'page'. - - // Bring the pin count to 1 so that we're not using surplus reservations. - while (page->pin_count > 1) UnpinLocked(client, page_handle); - *buffer_handle = std::move(page->buffer); - } - CleanUpPage(page_handle); -} - -Status BufferPool::AllocateBuffer(Client* client, int64_t len, BufferHandle* handle) { - client->reservation_->AllocateFrom(len); - return AllocateBufferInternal(client, len, handle); -} - -Status BufferPool::AllocateBufferInternal( - Client* client, int64_t len, BufferHandle* buffer) { - DCHECK(!buffer->is_open()); - DCHECK_GE(len, min_buffer_len_); - DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len)); - SCOPED_TIMER(client->counters_.get_buffer_time); - - // If there is headroom in 'buffer_bytes_remaining_', we can just allocate a new buffer. - if (TryDecreaseBufferBytesRemaining(len)) { - uint8_t* data; - Status status = allocator_->Allocate(len, &data); - if (!status.ok()) { - buffer_bytes_remaining_.Add(len); - return status; - } - DCHECK(data != NULL); - buffer->Open(client, data, len); - return Status::OK(); - } - - // If there is no remaining capacity, we must evict another page. - return Status(TErrorCode::NOT_IMPLEMENTED_ERROR, - Substitute("Buffer bytes limit $0 of buffer pool is exhausted and page eviction is " - "not implemented yet!", buffer_bytes_limit_)); -} - -void BufferPool::FreeBuffer(Client* client, BufferHandle* handle) { - if (!handle->is_open()) return; // Should be idempotent. - DCHECK_EQ(client, handle->client_); - client->reservation_->ReleaseTo(handle->len_); - FreeBufferInternal(handle); -} - -void BufferPool::FreeBufferInternal(BufferHandle* handle) { - DCHECK(handle->is_open()); - allocator_->Free(handle->data(), handle->len()); - buffer_bytes_remaining_.Add(handle->len()); - handle->Reset(); -} - -Status BufferPool::TransferBuffer( - Client* src_client, BufferHandle* src, Client* dst_client, BufferHandle* dst) { - DCHECK(src->is_open()); - DCHECK(!dst->is_open()); - DCHECK_EQ(src_client, src->client_); - DCHECK_NE(src, dst); - DCHECK_NE(src_client, dst_client); - - dst_client->reservation_->AllocateFrom(src->len()); - src_client->reservation_->ReleaseTo(src->len()); - *dst = std::move(*src); - dst->client_ = dst_client; - return Status::OK(); -} - -bool BufferPool::TryDecreaseBufferBytesRemaining(int64_t len) { - // TODO: we may want to change this policy so that we don't always use up to the limit - // for buffers, since this may starve other operators using non-buffer-pool memory. - while (true) { - int64_t old_value = buffer_bytes_remaining_.Load(); - if (old_value < len) return false; - int64_t new_value = old_value - len; - if (buffer_bytes_remaining_.CompareAndSwap(old_value, new_value)) { - return true; - } - } -} - -string BufferPool::Client::DebugString() const { - if (is_registered()) { - return Substitute("<BufferPool::Client> $0 name: $1 reservation: {$2}", this, name_, - reservation_->DebugString()); - } else { - return Substitute("<BufferPool::Client> $0 UNREGISTERED", this); - } -} - -string BufferPool::PageHandle::DebugString() const { - if (is_open()) { - lock_guard<SpinLock> pl(page_->lock); - return Substitute( - "<BufferPool::PageHandle> $0 client: {$1} page: {$2}", - this, client_->DebugString(), page_->DebugString()); - } else { - return Substitute("<BufferPool::PageHandle> $0 CLOSED", this); - } -} - -string BufferPool::BufferHandle::DebugString() const { - if (is_open()) { - return Substitute("<BufferPool::BufferHandle> $0 client: {$1} data: $2 len: $3", this, - client_->DebugString(), data_, len_); - } else { - return Substitute("<BufferPool::BufferHandle> $0 CLOSED", this); - } -} - -string BufferPool::DebugString() { - stringstream ss; - ss << "<BufferPool> " << this << " min_buffer_len: " << min_buffer_len_ - << " buffer_bytes_limit: " << buffer_bytes_limit_ - << " buffer_bytes_remaining: " << buffer_bytes_remaining_.Load() << "\n"; - pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1)); - return ss.str(); -} -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/buffer-pool.h ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/buffer-pool.h b/be/src/bufferpool/buffer-pool.h deleted file mode 100644 index 6a9641d..0000000 --- a/be/src/bufferpool/buffer-pool.h +++ /dev/null @@ -1,426 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_BUFFER_POOL_H -#define IMPALA_BUFFER_POOL_H - -#include <stdint.h> -#include <boost/scoped_ptr.hpp> -#include <boost/thread/locks.hpp> -#include <string> - -#include "bufferpool/buffer-allocator.h" -#include "bufferpool/buffer-pool-counters.h" -#include "common/atomic.h" -#include "common/status.h" -#include "gutil/macros.h" -#include "util/internal-queue.h" -#include "util/spinlock.h" - -namespace impala { - -class BufferAllocator; -class ReservationTracker; - -/// A buffer pool that manages memory buffers for all queries in an Impala daemon. -/// The buffer pool enforces buffer reservations, limits, and implements policies -/// for moving spilled memory from in-memory buffers to disk. It also enables reuse of -/// buffers between queries, to avoid frequent allocations. -/// -/// The buffer pool can be used for allocating any large buffers (above a configurable -/// minimum length), whether or not the buffers will be spilled. Smaller allocations -/// are not serviced directly by the buffer pool: clients of the buffer pool must -/// subdivide buffers if they wish to use smaller allocations. -/// -/// All buffer pool operations are in the context of a registered buffer pool client. -/// A buffer pool client should be created for every allocator of buffers at the level -/// of granularity required for reporting and enforcement of reservations, e.g. an exec -/// node. The client tracks buffer reservations via its ReservationTracker and also -/// includes info that is helpful for debugging (e.g. the operator that is associated -/// with the buffer). The client is not threadsafe, i.e. concurrent buffer pool -/// operations should not be invoked for the same client. -/// -/// TODO: -/// * Implement spill-to-disk. -/// * Decide on, document, and enforce upper limits on page size. -/// -/// Pages, Buffers and Pinning -/// ========================== -/// * A page is a logical block of memory that can reside in memory or on disk. -/// * A buffer is a physical block of memory that can hold a page in memory. -/// * A page handle is used by buffer pool clients to identify and access a page and -/// the corresponding buffer. Clients do not interact with pages directly. -/// * A buffer handle is used by buffer pool clients to identify and access a buffer. -/// * A page is pinned if it has pin count > 0. A pinned page stays mapped to the same -/// buffer. -/// * An unpinned page can be written out to disk by the buffer pool so that the buffer -/// can be used for another purpose. -/// -/// Buffer/Page Sizes -/// ================= -/// The buffer pool has a minimum buffer size, which must be a power-of-two. Page and -/// buffer sizes must be an exact power-of-two multiple of the minimum buffer size. -/// -/// Reservations -/// ============ -/// Before allocating buffers or pinning pages, a client must reserve memory through its -/// ReservationTracker. Reservation of n bytes give a client the right to allocate -/// buffers or pin pages summing up to n bytes. Reservations are both necessary and -/// sufficient for a client to allocate buffers or pin pages: the operations succeed -/// unless a "system error" such as a disk write error is encountered that prevents -/// unpinned pages from being to disk. -/// -/// More memory may be reserved than is used, e.g. if a client is not using its full -/// reservation. In such cases, the buffer pool can use the free buffers in any way, -/// e.g. for keeping unpinned pages in memory, so long as it is able to fulfill the -/// reservations when needed, e.g. by flushing unpinned pages to disk. -/// -/// Page/Buffer Handles -/// =================== -/// The buffer pool exposes PageHandles and BufferHandles, which are owned by clients of -/// the buffer pool, and act as a proxy for the internal data structure representing the -/// page or buffer in the buffer pool. Handles are "open" if they are associated with a -/// page or buffer. An open PageHandle is obtained by creating a page. PageHandles are -/// closed by calling BufferPool::DestroyPage(). An open BufferHandle is obtained by -/// allocating a buffer or extracting a BufferHandle from a PageHandle. A page's buffer -/// can also be accessed through the PageHandle. The handle destructors check for -/// resource leaks, e.g. an open handle that would result in a buffer leak. -/// -/// Pin Counting of Page Handles: -/// ---------------------------------- -/// Page handles are scoped to a client. The invariants are as follows: -/// * A page can only be accessed through an open handle. -/// * A page is destroyed once the handle is destroyed via DestroyPage(). -/// * A page's buffer can only be accessed through a pinned handle. -/// * Pin() can be called on an open handle, incrementing the handle's pin count. -/// * Unpin() can be called on a pinned handle, but not an unpinned handle. -/// * Pin() always increases usage of reservations, and Unpin() always decreases usage, -/// i.e. the handle consumes <pin count> * <page size> bytes of reservation. -/// -/// Example Usage: Buffers -/// ================================== -/// The simplest use case is to allocate a memory buffer. -/// * The new buffer is created with AllocateBuffer(). -/// * The client reads and writes to the buffer as it sees fit. -/// * If the client is done with the buffer's contents it can call FreeBuffer() to -/// destroy the handle and free the buffer, or use TransferBuffer() to transfer -/// the buffer to a different client. -/// -/// Example Usage: Spillable Pages -/// ============================== -/// * A spilling operator creates a new page with CreatePage(). -/// * The client reads and writes to the page's buffer as it sees fit. -/// * If the operator encounters memory pressure, it can decrease reservation usage by -/// calling Unpin() on the page. The page may then be written to disk and its buffer -/// repurposed internally by BufferPool. -/// * Once the operator needs the page's contents again and has sufficient unused -/// reservation, it can call Pin(), which brings the page's contents back into memory, -/// perhaps in a different buffer. Therefore the operator must fix up any pointers into -/// the previous buffer. -/// * If the operator is done with the page, it can call FreeBuffer() to destroy the -/// handle and release resources, or call ExtractBuffer() to extract the buffer. -/// -/// Synchronization -/// =============== -/// The data structures in the buffer pool itself are thread-safe. Client-owned data -/// structures - Client, PageHandle and BufferHandle - are not protected from concurrent -/// access by the buffer pool: clients must ensure that they do not invoke concurrent -/// operations with the same Client, PageHandle or BufferHandle. -// -/// +========================+ -/// | IMPLEMENTATION DETAILS | -/// +========================+ -/// -/// Lock Ordering -/// ============= -/// The lock ordering is: -/// * pages_::lock_ -> Page::lock_ -/// -/// If a reference to a page is acquired via the pages_ list, pages_::lock_ must be held -/// until done with the page to ensure the page isn't concurrently deleted. -class BufferPool { - public: - class BufferHandle; - class Client; - class PageHandle; - - /// Constructs a new buffer pool. - /// 'min_buffer_len': the minimum buffer length for the pool. Must be a power of two. - /// 'buffer_bytes_limit': the maximum physical memory in bytes that can be used by the - /// buffer pool. If 'buffer_bytes_limit' is not a multiple of 'min_buffer_len', the - /// remainder will not be usable. - BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit); - ~BufferPool(); - - /// Register a client. Returns an error status and does not register the client if the - /// arguments are invalid. 'name' is an arbitrary name used to identify the client in - /// any errors messages or logging. Counters for this client are added to the (non-NULL) - /// 'profile'. 'client' is the client to register. 'client' should not already be - /// registered. - Status RegisterClient(const std::string& name, ReservationTracker* reservation, - RuntimeProfile* profile, Client* client); - - /// Deregister 'client' if it is registered. Idempotent. - void DeregisterClient(Client* client); - - /// Create a new page of 'len' bytes with pin count 1. 'len' must be a page length - /// supported by BufferPool (see BufferPool class comment). The client must have - /// sufficient unused reservation to pin the new page (otherwise it will DCHECK). - /// CreatePage() only fails when a system error prevents the buffer pool from fulfilling - /// the reservation. - /// On success, the handle is mapped to the new page. - Status CreatePage(Client* client, int64_t len, PageHandle* handle); - - /// Increment the pin count of 'handle'. After Pin() the underlying page will - /// be mapped to a buffer, which will be accessible through 'handle'. Uses - /// reservation from 'client'. The caller is responsible for ensuring it has enough - /// unused reservation before calling Pin() (otherwise it will DCHECK). Pin() only - /// fails when a system error prevents the buffer pool from fulfilling the reservation. - /// 'handle' must be open. - Status Pin(Client* client, PageHandle* handle); - - /// Decrement the pin count of 'handle'. Decrease client's reservation usage. If the - /// handle's pin count becomes zero, it is no longer valid for the underlying page's - /// buffer to be accessed via 'handle'. If the page's total pin count across all - /// handles that reference it goes to zero, the page's data may be written to disk and - /// the buffer reclaimed. 'handle' must be open and have a pin count > 0. - /// TODO: once we implement spilling, it will be an error to call Unpin() with - /// spilling disabled. E.g. if Impala is running without scratch (we want to be - /// able to test Unpin() before we implement actual spilling). - void Unpin(Client* client, PageHandle* handle); - - /// Destroy the page referenced by 'handle' (if 'handle' is open). Any buffers or disk - /// storage backing the page are freed. Idempotent. If the page is pinned, the - /// reservation usage is decreased accordingly. - void DestroyPage(Client* client, PageHandle* handle); - - /// Extracts buffer from a pinned page. After this returns, the page referenced by - /// 'page_handle' will be destroyed and 'buffer_handle' will reference the buffer from - /// 'page_handle'. This may decrease reservation usage of 'client' if the page was - /// pinned multiple times via 'page_handle'. - void ExtractBuffer( - Client* client, PageHandle* page_handle, BufferHandle* buffer_handle); - - /// Allocates a new buffer of 'len' bytes. Uses reservation from 'client'. The caller - /// is responsible for ensuring it has enough unused reservation before calling - /// AllocateBuffer() (otherwise it will DCHECK). AllocateBuffer() only fails when - /// a system error prevents the buffer pool from fulfilling the reservation. - Status AllocateBuffer(Client* client, int64_t len, BufferHandle* handle); - - /// If 'handle' is open, close 'handle', free the buffer and and decrease the - /// reservation usage from 'client'. Idempotent. - void FreeBuffer(Client* client, BufferHandle* handle); - - /// Transfer ownership of buffer from 'src_client' to 'dst_client' and move the - /// handle from 'src' to 'dst'. Increases reservation usage in 'dst_client' and - /// decreases reservation usage in 'src_client'. 'src' must be open and 'dst' must be - /// closed before calling. 'src'/'dst' and 'src_client'/'dst_client' must be different. - /// After a successful call, 'src' is closed and 'dst' is open. - Status TransferBuffer(Client* src_client, BufferHandle* src, Client* dst_client, - BufferHandle* dst); - - /// Print a debug string with the state of the buffer pool. - std::string DebugString(); - - int64_t min_buffer_len() const; - int64_t buffer_bytes_limit() const; - - private: - DISALLOW_COPY_AND_ASSIGN(BufferPool); - struct Page; - - /// Same as Unpin(), except the lock for the page referenced by 'handle' must be held - /// by the caller. - void UnpinLocked(Client* client, PageHandle* handle); - - /// Perform the cleanup of the page object and handle when the page is destroyed. - /// Reset 'handle', free the Page object and remove the 'pages_' entry. - /// The 'handle->page_' lock should *not* be held by the caller. - void CleanUpPage(PageHandle* handle); - - /// Allocate a buffer of length 'len'. Assumes that the client's reservation has already - /// been consumed for the buffer. Returns an error if the pool is unable to fulfill the - /// reservation. - Status AllocateBufferInternal(Client* client, int64_t len, BufferHandle* buffer); - - /// Frees 'buffer', which must be open before calling. Closes 'buffer' and updates - /// internal state but does not release to any reservation. - void FreeBufferInternal(BufferHandle* buffer); - - /// Check if we can allocate another buffer of size 'len' bytes without - /// 'buffer_bytes_remaining_' going negative. - /// Returns true and decrease 'buffer_bytes_remaining_' by 'len' if successful. - bool TryDecreaseBufferBytesRemaining(int64_t len); - - /// Allocator for allocating and freeing all buffer memory. - boost::scoped_ptr<BufferAllocator> allocator_; - - /// The minimum length of a buffer in bytes. All buffers and pages are a power-of-two - /// multiple of this length. This is always a power of two. - const int64_t min_buffer_len_; - - /// The maximum physical memory in bytes that can be used for buffers. - const int64_t buffer_bytes_limit_; - - /// The remaining number of bytes of 'buffer_bytes_limit_' that can be used for - /// allocating new buffers. Must be updated atomically before a new buffer is - /// allocated or after an existing buffer is freed. - AtomicInt64 buffer_bytes_remaining_; - - /// List containing all pages. Protected by the list's internal lock. - typedef InternalQueue<Page> PageList; - PageList pages_; -}; - -/// External representation of a client of the BufferPool. Clients are used for -/// reservation accounting, and will be used in the future for tracking per-client -/// buffer pool counters. This class is the external handle for a client so -/// each Client instance is owned by the BufferPool's client, rather than the BufferPool. -/// Each Client should only be used by a single thread at a time: concurrently calling -/// Client methods or BufferPool methods with the Client as an argument is not supported. -class BufferPool::Client { - public: - Client() : reservation_(NULL) {} - /// Client must be deregistered. - ~Client() { DCHECK(!is_registered()); } - - bool is_registered() const { return reservation_ != NULL; } - ReservationTracker* reservation() { return reservation_; } - - std::string DebugString() const; - - private: - friend class BufferPool; - DISALLOW_COPY_AND_ASSIGN(Client); - - /// Initialize 'counters_' and add the counters to 'profile'. - void InitCounters(RuntimeProfile* profile); - - /// A name identifying the client. - std::string name_; - - /// The reservation tracker for the client. NULL means the client isn't registered. - /// All pages pinned by the client count as usage against 'reservation_'. - ReservationTracker* reservation_; - - /// The RuntimeProfile counters for this client. All non-NULL if is_registered() - /// is true. - BufferPoolClientCounters counters_; -}; - -/// A handle to a buffer allocated from the buffer pool. Each BufferHandle should only -/// be used by a single thread at a time: concurrently calling BufferHandle methods or -/// BufferPool methods with the BufferHandle as an argument is not supported. -class BufferPool::BufferHandle { - public: - BufferHandle(); - ~BufferHandle() { DCHECK(!is_open()); } - - /// Allow move construction of handles, to support std::move(). - BufferHandle(BufferHandle&& src); - - /// Allow move assignment of handles, to support STL classes like std::vector. - /// Destination must be uninitialized. - BufferHandle& operator=(BufferHandle&& src); - - bool is_open() const { return data_ != NULL; } - int64_t len() const { - DCHECK(is_open()); - return len_; - } - /// Get a pointer to the start of the buffer. - uint8_t* data() const { - DCHECK(is_open()); - return data_; - } - - std::string DebugString() const; - - private: - DISALLOW_COPY_AND_ASSIGN(BufferHandle); - friend class BufferPool; - - /// Internal helper to set the handle to an opened state. - void Open(const Client* client, uint8_t* data, int64_t len); - - /// Internal helper to reset the handle to an unopened state. - void Reset(); - - /// The client the buffer handle belongs to, used to validate that the correct client - /// is provided in BufferPool method calls. - const Client* client_; - - /// Pointer to the start of the buffer. Non-NULL if open, NULL if closed. - uint8_t* data_; - - /// Length of the buffer in bytes. - int64_t len_; -}; - -/// The handle for a page used by clients of the BufferPool. Each PageHandle should -/// only be used by a single thread at a time: concurrently calling PageHandle methods -/// or BufferPool methods with the PageHandle as an argument is not supported. -class BufferPool::PageHandle { - public: - PageHandle(); - ~PageHandle() { DCHECK(!is_open()); } - - // Allow move construction of page handles, to support std::move(). - PageHandle(PageHandle&& src); - - // Allow move assignment of page handles, to support STL classes like std::vector. - // Destination must be closed. - PageHandle& operator=(PageHandle&& src); - - bool is_open() const { return page_ != NULL; } - bool is_pinned() const { return pin_count() > 0; } - int pin_count() const; - int64_t len() const; - /// Get a pointer to the start of the page's buffer. Only valid to call if the page - /// is pinned via this handle. - uint8_t* data() const { return buffer_handle()->data(); } - - /// Return a pointer to the page's buffer handle. Only valid to call if the page is - /// pinned via this handle. Only const accessors of the returned handle can be used: - /// it is invalid to call FreeBuffer() or TransferBuffer() on it or to otherwise modify - /// the handle. - const BufferHandle* buffer_handle() const; - - std::string DebugString() const; - - private: - DISALLOW_COPY_AND_ASSIGN(PageHandle); - friend class BufferPool; - friend class Page; - - /// Internal helper to open the handle for the given page. - void Open(Page* page, Client* client); - - /// Internal helper to reset the handle to an unopened state. - void Reset(); - - /// The internal page structure. NULL if the handle is not open. - Page* page_; - - /// The client the page handle belongs to, used to validate that the correct client - /// is being used. - const Client* client_; -}; - -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/reservation-tracker-counters.h ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/reservation-tracker-counters.h b/be/src/bufferpool/reservation-tracker-counters.h deleted file mode 100644 index 0952a2f..0000000 --- a/be/src/bufferpool/reservation-tracker-counters.h +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_RESERVATION_TRACKER_COUNTERS_H -#define IMPALA_RESERVATION_TRACKER_COUNTERS_H - -#include "util/runtime-profile.h" - -namespace impala { - -/// A set of counters for each ReservationTracker for reporting purposes. -/// -/// If the ReservationTracker is linked to a profile these have the same lifetime as that -/// profile, otherwise they have the same lifetime as the ReservationTracker itself. -struct ReservationTrackerCounters { - /// The tracker's peak reservation in bytes. - RuntimeProfile::HighWaterMarkCounter* peak_reservation; - - /// The tracker's peak usage in bytes. - RuntimeProfile::HighWaterMarkCounter* peak_used_reservation; - - /// The hard limit on the tracker's reservations - RuntimeProfile::Counter* reservation_limit; -}; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/reservation-tracker-test.cc ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/reservation-tracker-test.cc b/be/src/bufferpool/reservation-tracker-test.cc deleted file mode 100644 index 66ce287..0000000 --- a/be/src/bufferpool/reservation-tracker-test.cc +++ /dev/null @@ -1,378 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <limits> -#include <string> -#include <vector> - -#include "bufferpool/reservation-tracker.h" -#include "common/init.h" -#include "common/object-pool.h" -#include "runtime/mem-tracker.h" -#include "testutil/gtest-util.h" - -#include "common/names.h" - -namespace impala { - -class ReservationTrackerTest : public ::testing::Test { - public: - virtual void SetUp() {} - - virtual void TearDown() { - root_.Close(); - obj_pool_.Clear(); - } - - /// The minimum allocation size used in most tests. - const static int64_t MIN_BUFFER_LEN = 1024; - - protected: - RuntimeProfile* NewProfile() { - return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile")); - } - - ObjectPool obj_pool_; - - ReservationTracker root_; - - scoped_ptr<RuntimeProfile> profile_; -}; - -const int64_t ReservationTrackerTest::MIN_BUFFER_LEN; - -TEST_F(ReservationTrackerTest, BasicSingleTracker) { - const int64_t limit = 16; - root_.InitRootTracker(NULL, limit); - ASSERT_EQ(0, root_.GetReservation()); - ASSERT_EQ(0, root_.GetUsedReservation()); - - // Fail to increase reservation. - ASSERT_FALSE(root_.IncreaseReservation(limit + 1)); - ASSERT_EQ(0, root_.GetReservation()); - ASSERT_EQ(0, root_.GetUsedReservation()); - ASSERT_EQ(0, root_.GetUnusedReservation()); - - // Successfully increase reservation. - ASSERT_TRUE(root_.IncreaseReservation(limit - 1)); - ASSERT_EQ(limit - 1, root_.GetReservation()); - ASSERT_EQ(0, root_.GetUsedReservation()); - ASSERT_EQ(limit - 1, root_.GetUnusedReservation()); - - // Adjust usage. - root_.AllocateFrom(2); - ASSERT_EQ(limit - 1, root_.GetReservation()); - ASSERT_EQ(2, root_.GetUsedReservation()); - ASSERT_EQ(limit - 3, root_.GetUnusedReservation()); - root_.ReleaseTo(1); - ASSERT_EQ(1, root_.GetUsedReservation()); - root_.ReleaseTo(1); - ASSERT_EQ(0, root_.GetUsedReservation()); - ASSERT_EQ(limit - 1, root_.GetReservation()); - ASSERT_EQ(limit - 1, root_.GetUnusedReservation()); -} - -TEST_F(ReservationTrackerTest, BasicTwoLevel) { - const int64_t limit = 16; - root_.InitRootTracker(NULL, limit); - - const int64_t root_reservation = limit / 2; - // Get half of the limit as an initial reservation. - ASSERT_TRUE(root_.IncreaseReservation(root_reservation)); - ASSERT_EQ(root_reservation, root_.GetReservation()); - ASSERT_EQ(root_reservation, root_.GetUnusedReservation()); - ASSERT_EQ(0, root_.GetUsedReservation()); - ASSERT_EQ(0, root_.GetChildReservations()); - - ReservationTracker child; - child.InitChildTracker(NULL, &root_, NULL, numeric_limits<int64_t>::max()); - - const int64_t child_reservation = root_reservation + 1; - // Get parent's reservation plus a bit more. - ASSERT_TRUE(child.IncreaseReservation(child_reservation)); - - ASSERT_EQ(child_reservation, root_.GetReservation()); - ASSERT_EQ(0, root_.GetUnusedReservation()); - ASSERT_EQ(0, root_.GetUsedReservation()); - ASSERT_EQ(child_reservation, root_.GetChildReservations()); - - ASSERT_EQ(child_reservation, child.GetReservation()); - ASSERT_EQ(child_reservation, child.GetUnusedReservation()); - ASSERT_EQ(0, child.GetUsedReservation()); - ASSERT_EQ(0, child.GetChildReservations()); - - // Check that child allocation is reflected correctly. - child.AllocateFrom(1); - ASSERT_EQ(child_reservation, child.GetReservation()); - ASSERT_EQ(1, child.GetUsedReservation()); - ASSERT_EQ(0, root_.GetUsedReservation()); - - // Check that parent reservation increase is reflected correctly. - ASSERT_TRUE(root_.IncreaseReservation(1)); - ASSERT_EQ(child_reservation + 1, root_.GetReservation()); - ASSERT_EQ(1, root_.GetUnusedReservation()); - ASSERT_EQ(0, root_.GetUsedReservation()); - ASSERT_EQ(child_reservation, root_.GetChildReservations()); - ASSERT_EQ(child_reservation, child.GetReservation()); - - // Check that parent allocation is reflected correctly. - root_.AllocateFrom(1); - ASSERT_EQ(child_reservation + 1, root_.GetReservation()); - ASSERT_EQ(0, root_.GetUnusedReservation()); - ASSERT_EQ(1, root_.GetUsedReservation()); - - // Release allocations. - root_.ReleaseTo(1); - ASSERT_EQ(0, root_.GetUsedReservation()); - child.ReleaseTo(1); - ASSERT_EQ(0, child.GetUsedReservation()); - - // Child reservation should be returned all the way up the tree. - child.DecreaseReservation(1); - ASSERT_EQ(child_reservation, root_.GetReservation()); - ASSERT_EQ(child_reservation - 1, child.GetReservation()); - ASSERT_EQ(child_reservation - 1, root_.GetChildReservations()); - - // Closing the child should release its reservation. - child.Close(); - ASSERT_EQ(1, root_.GetReservation()); - ASSERT_EQ(0, root_.GetChildReservations()); -} - -TEST_F(ReservationTrackerTest, CloseIdempotency) { - // Check we can close before opening. - root_.Close(); - - const int64_t limit = 16; - root_.InitRootTracker(NULL, limit); - - // Check we can close twice - root_.Close(); - root_.Close(); -} - -// Test that the tracker's reservation limit is enforced. -TEST_F(ReservationTrackerTest, ReservationLimit) { - Status status; - // Setup trackers so that there is a spare buffer that the client isn't entitled to. - int64_t total_mem = MIN_BUFFER_LEN * 3; - int64_t client_limit = MIN_BUFFER_LEN * 2; - root_.InitRootTracker(NULL, total_mem); - - ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); - client_tracker->InitChildTracker(NULL, &root_, NULL, client_limit); - - // We can only increase reservation up to the client's limit. - ASSERT_FALSE(client_tracker->IncreaseReservation(client_limit + 1)); - ASSERT_TRUE(client_tracker->IncreaseReservation(client_limit)); - ASSERT_FALSE(client_tracker->IncreaseReservation(1)); - - client_tracker->Close(); -} - -// Test that parent's reservation limit is enforced. -TEST_F(ReservationTrackerTest, ParentReservationLimit) { - Status status; - // Setup reservations so that there is a spare buffer. - int64_t total_mem = MIN_BUFFER_LEN * 4; - int64_t parent_limit = MIN_BUFFER_LEN * 3; - int64_t other_client_reservation = MIN_BUFFER_LEN; - root_.InitRootTracker(NULL, total_mem); - - // The child reservation limit is higher than the parent reservation limit, so the - // parent limit is the effective limit. - ReservationTracker* query_tracker = obj_pool_.Add(new ReservationTracker()); - ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); - query_tracker->InitChildTracker(NULL, &root_, NULL, parent_limit); - client_tracker->InitChildTracker(NULL, query_tracker, NULL, total_mem * 10); - - ReservationTracker* other_client_tracker = obj_pool_.Add(new ReservationTracker()); - other_client_tracker->InitChildTracker(NULL, query_tracker, NULL, total_mem); - ASSERT_TRUE(other_client_tracker->IncreaseReservationToFit(other_client_reservation)); - ASSERT_EQ(root_.GetUsedReservation(), 0); - ASSERT_EQ(root_.GetChildReservations(), other_client_reservation); - ASSERT_EQ(query_tracker->GetUsedReservation(), 0); - ASSERT_EQ(query_tracker->GetUnusedReservation(), 0); - - // Can only increase reservation up to parent limit, excluding other reservations. - int64_t effective_limit = parent_limit - other_client_reservation; - ASSERT_FALSE(client_tracker->IncreaseReservation(effective_limit + MIN_BUFFER_LEN)); - ASSERT_TRUE(client_tracker->IncreaseReservation(effective_limit)); - ASSERT_FALSE(client_tracker->IncreaseReservation(MIN_BUFFER_LEN)); - - // Check that tracker hierarchy reports correct usage. - ASSERT_EQ(root_.GetUsedReservation(), 0); - ASSERT_EQ(root_.GetChildReservations(), parent_limit); - ASSERT_EQ(query_tracker->GetUsedReservation(), 0); - ASSERT_EQ(query_tracker->GetUnusedReservation(), 0); - - client_tracker->Close(); - other_client_tracker->Close(); - query_tracker->Close(); -} - -/// Test integration of ReservationTracker with MemTracker. -TEST_F(ReservationTrackerTest, MemTrackerIntegrationTwoLevel) { - // Setup a 2-level hierarchy of trackers. The child ReservationTracker is linked to - // the child MemTracker. We add various limits at different places to enable testing - // of different code paths. - root_.InitRootTracker(NewProfile(), MIN_BUFFER_LEN * 100); - MemTracker root_mem_tracker; - MemTracker child_mem_tracker1(-1, "Child 1", &root_mem_tracker); - MemTracker child_mem_tracker2(MIN_BUFFER_LEN * 50, "Child 2", &root_mem_tracker); - ReservationTracker child_reservations1, child_reservations2; - child_reservations1.InitChildTracker( - NewProfile(), &root_, &child_mem_tracker1, 500 * MIN_BUFFER_LEN); - child_reservations2.InitChildTracker( - NewProfile(), &root_, &child_mem_tracker2, 75 * MIN_BUFFER_LEN); - - // Check that a single buffer reservation is accounted correctly. - ASSERT_TRUE(child_reservations1.IncreaseReservation(MIN_BUFFER_LEN)); - ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation()); - ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption()); - ASSERT_EQ(MIN_BUFFER_LEN, root_mem_tracker.consumption()); - ASSERT_EQ(MIN_BUFFER_LEN, root_.GetChildReservations()); - - // Check that a buffer reservation from the other child is accounted correctly. - ASSERT_TRUE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN)); - ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation()); - ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption()); - ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption()); - ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations()); - - // Check that hitting the MemTracker limit leaves things in a consistent state. - ASSERT_FALSE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN * 50)); - ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation()); - ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption()); - ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation()); - ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption()); - ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption()); - ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations()); - - // Check that hitting the ReservationTracker's local limit leaves things in a - // consistent state. - ASSERT_FALSE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN * 75)); - ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation()); - ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption()); - ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation()); - ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption()); - ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption()); - ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations()); - - // Check that hitting the ReservationTracker's parent's limit after the - // MemTracker consumption is incremented leaves things in a consistent state. - ASSERT_FALSE(child_reservations1.IncreaseReservation(MIN_BUFFER_LEN * 100)); - ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation()); - ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption()); - ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation()); - ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption()); - ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption()); - ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations()); - - // Check that released memory is decremented from all trackers correctly. - child_reservations1.DecreaseReservation(MIN_BUFFER_LEN); - child_reservations2.DecreaseReservation(MIN_BUFFER_LEN); - ASSERT_EQ(0, child_reservations2.GetReservation()); - ASSERT_EQ(0, child_mem_tracker2.consumption()); - ASSERT_EQ(0, root_mem_tracker.consumption()); - ASSERT_EQ(0, root_.GetUsedReservation()); - - child_reservations1.Close(); - child_reservations2.Close(); - child_mem_tracker1.UnregisterFromParent(); - child_mem_tracker2.UnregisterFromParent(); -} - -TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) { - const int HIERARCHY_DEPTH = 5; - // Setup a multi-level hierarchy of trackers and ensure that consumption is reported - // correctly. - ReservationTracker reservations[HIERARCHY_DEPTH]; - scoped_ptr<MemTracker> mem_trackers[HIERARCHY_DEPTH]; - - // We can only handle MemTracker limits at the topmost linked ReservationTracker, - // so avoid adding limits at lower level. - const int LIMIT = HIERARCHY_DEPTH; - vector<int> mem_limits({LIMIT * 10, LIMIT, -1, -1, -1}); - - // Root trackers aren't linked. - mem_trackers[0].reset(new MemTracker(mem_limits[0])); - reservations[0].InitRootTracker(NewProfile(), 500); - for (int i = 1; i < HIERARCHY_DEPTH; ++i) { - mem_trackers[i].reset(new MemTracker( - mem_limits[i], Substitute("Child $0", i), mem_trackers[i - 1].get())); - reservations[i].InitChildTracker( - NewProfile(), &reservations[i - 1], mem_trackers[i].get(), 500); - } - - vector<int> interesting_amounts({LIMIT - 1, LIMIT, LIMIT + 1}); - - // Test that all limits and consumption correctly reported when consuming - // from a non-root ReservationTracker that is connected to a MemTracker. - for (int level = 1; level < HIERARCHY_DEPTH; ++level) { - int64_t lowest_limit = mem_trackers[level]->lowest_limit(); - for (int amount : interesting_amounts) { - bool increased = reservations[level].IncreaseReservation(amount); - if (lowest_limit == -1 || amount <= lowest_limit) { - // The increase should go through. - ASSERT_TRUE(increased) << reservations[level].DebugString(); - ASSERT_EQ(amount, reservations[level].GetReservation()); - ASSERT_EQ(amount, mem_trackers[level]->consumption()); - for (int ancestor = 0; ancestor < level; ++ancestor) { - ASSERT_EQ(amount, reservations[ancestor].GetChildReservations()); - ASSERT_EQ(amount, mem_trackers[ancestor]->consumption()); - } - - LOG(INFO) << "\n" << mem_trackers[0]->LogUsage(); - reservations[level].DecreaseReservation(amount); - } else { - ASSERT_FALSE(increased); - } - // We should be back in the original state. - for (int i = 0; i < HIERARCHY_DEPTH; ++i) { - ASSERT_EQ(0, reservations[i].GetReservation()) << i << ": " - << reservations[i].DebugString(); - ASSERT_EQ(0, reservations[i].GetChildReservations()); - ASSERT_EQ(0, mem_trackers[i]->consumption()); - } - } - } - - // "Pull down" a reservation from the top of the hierarchy level-by-level to the - // leaves, checking that everything is consistent at each step. - for (int level = 0; level < HIERARCHY_DEPTH; ++level) { - const int amount = LIMIT; - ASSERT_TRUE(reservations[level].IncreaseReservation(amount)); - ASSERT_EQ(amount, reservations[level].GetReservation()); - ASSERT_EQ(0, reservations[level].GetUsedReservation()); - if (level != 0) ASSERT_EQ(amount, mem_trackers[level]->consumption()); - for (int ancestor = 0; ancestor < level; ++ancestor) { - ASSERT_EQ(0, reservations[ancestor].GetUsedReservation()); - ASSERT_EQ(amount, reservations[ancestor].GetChildReservations()); - ASSERT_EQ(amount, mem_trackers[ancestor]->consumption()); - } - reservations[level].DecreaseReservation(amount); - } - - for (int i = HIERARCHY_DEPTH - 1; i >= 0; --i) { - reservations[i].Close(); - if (i != 0) mem_trackers[i]->UnregisterFromParent(); - } -} -} - -IMPALA_TEST_MAIN();
