http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/buffer-pool.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h new file mode 100644 index 0000000..7839612 --- /dev/null +++ b/be/src/runtime/bufferpool/buffer-pool.h @@ -0,0 +1,426 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_BUFFER_POOL_H +#define IMPALA_RUNTIME_BUFFER_POOL_H + +#include <stdint.h> +#include <boost/scoped_ptr.hpp> +#include <boost/thread/locks.hpp> +#include <string> + +#include "runtime/bufferpool/buffer-allocator.h" +#include "runtime/bufferpool/buffer-pool-counters.h" +#include "common/atomic.h" +#include "common/status.h" +#include "gutil/macros.h" +#include "util/internal-queue.h" +#include "util/spinlock.h" + +namespace impala { + +class BufferAllocator; +class ReservationTracker; + +/// A buffer pool that manages memory buffers for all queries in an Impala daemon. +/// The buffer pool enforces buffer reservations, limits, and implements policies +/// for moving spilled memory from in-memory buffers to disk. It also enables reuse of +/// buffers between queries, to avoid frequent allocations. +/// +/// The buffer pool can be used for allocating any large buffers (above a configurable +/// minimum length), whether or not the buffers will be spilled. Smaller allocations +/// are not serviced directly by the buffer pool: clients of the buffer pool must +/// subdivide buffers if they wish to use smaller allocations. +/// +/// All buffer pool operations are in the context of a registered buffer pool client. +/// A buffer pool client should be created for every allocator of buffers at the level +/// of granularity required for reporting and enforcement of reservations, e.g. an exec +/// node. The client tracks buffer reservations via its ReservationTracker and also +/// includes info that is helpful for debugging (e.g. the operator that is associated +/// with the buffer). The client is not threadsafe, i.e. concurrent buffer pool +/// operations should not be invoked for the same client. +/// +/// TODO: +/// * Implement spill-to-disk. +/// * Decide on, document, and enforce upper limits on page size. +/// +/// Pages, Buffers and Pinning +/// ========================== +/// * A page is a logical block of memory that can reside in memory or on disk. +/// * A buffer is a physical block of memory that can hold a page in memory. +/// * A page handle is used by buffer pool clients to identify and access a page and +/// the corresponding buffer. Clients do not interact with pages directly. +/// * A buffer handle is used by buffer pool clients to identify and access a buffer. +/// * A page is pinned if it has pin count > 0. A pinned page stays mapped to the same +/// buffer. +/// * An unpinned page can be written out to disk by the buffer pool so that the buffer +/// can be used for another purpose. +/// +/// Buffer/Page Sizes +/// ================= +/// The buffer pool has a minimum buffer size, which must be a power-of-two. Page and +/// buffer sizes must be an exact power-of-two multiple of the minimum buffer size. +/// +/// Reservations +/// ============ +/// Before allocating buffers or pinning pages, a client must reserve memory through its +/// ReservationTracker. Reservation of n bytes give a client the right to allocate +/// buffers or pin pages summing up to n bytes. Reservations are both necessary and +/// sufficient for a client to allocate buffers or pin pages: the operations succeed +/// unless a "system error" such as a disk write error is encountered that prevents +/// unpinned pages from being to disk. +/// +/// More memory may be reserved than is used, e.g. if a client is not using its full +/// reservation. In such cases, the buffer pool can use the free buffers in any way, +/// e.g. for keeping unpinned pages in memory, so long as it is able to fulfill the +/// reservations when needed, e.g. by flushing unpinned pages to disk. +/// +/// Page/Buffer Handles +/// =================== +/// The buffer pool exposes PageHandles and BufferHandles, which are owned by clients of +/// the buffer pool, and act as a proxy for the internal data structure representing the +/// page or buffer in the buffer pool. Handles are "open" if they are associated with a +/// page or buffer. An open PageHandle is obtained by creating a page. PageHandles are +/// closed by calling BufferPool::DestroyPage(). An open BufferHandle is obtained by +/// allocating a buffer or extracting a BufferHandle from a PageHandle. A page's buffer +/// can also be accessed through the PageHandle. The handle destructors check for +/// resource leaks, e.g. an open handle that would result in a buffer leak. +/// +/// Pin Counting of Page Handles: +/// ---------------------------------- +/// Page handles are scoped to a client. The invariants are as follows: +/// * A page can only be accessed through an open handle. +/// * A page is destroyed once the handle is destroyed via DestroyPage(). +/// * A page's buffer can only be accessed through a pinned handle. +/// * Pin() can be called on an open handle, incrementing the handle's pin count. +/// * Unpin() can be called on a pinned handle, but not an unpinned handle. +/// * Pin() always increases usage of reservations, and Unpin() always decreases usage, +/// i.e. the handle consumes <pin count> * <page size> bytes of reservation. +/// +/// Example Usage: Buffers +/// ================================== +/// The simplest use case is to allocate a memory buffer. +/// * The new buffer is created with AllocateBuffer(). +/// * The client reads and writes to the buffer as it sees fit. +/// * If the client is done with the buffer's contents it can call FreeBuffer() to +/// destroy the handle and free the buffer, or use TransferBuffer() to transfer +/// the buffer to a different client. +/// +/// Example Usage: Spillable Pages +/// ============================== +/// * A spilling operator creates a new page with CreatePage(). +/// * The client reads and writes to the page's buffer as it sees fit. +/// * If the operator encounters memory pressure, it can decrease reservation usage by +/// calling Unpin() on the page. The page may then be written to disk and its buffer +/// repurposed internally by BufferPool. +/// * Once the operator needs the page's contents again and has sufficient unused +/// reservation, it can call Pin(), which brings the page's contents back into memory, +/// perhaps in a different buffer. Therefore the operator must fix up any pointers into +/// the previous buffer. +/// * If the operator is done with the page, it can call FreeBuffer() to destroy the +/// handle and release resources, or call ExtractBuffer() to extract the buffer. +/// +/// Synchronization +/// =============== +/// The data structures in the buffer pool itself are thread-safe. Client-owned data +/// structures - Client, PageHandle and BufferHandle - are not protected from concurrent +/// access by the buffer pool: clients must ensure that they do not invoke concurrent +/// operations with the same Client, PageHandle or BufferHandle. +// +/// +========================+ +/// | IMPLEMENTATION DETAILS | +/// +========================+ +/// +/// Lock Ordering +/// ============= +/// The lock ordering is: +/// * pages_::lock_ -> Page::lock_ +/// +/// If a reference to a page is acquired via the pages_ list, pages_::lock_ must be held +/// until done with the page to ensure the page isn't concurrently deleted. +class BufferPool { + public: + class BufferHandle; + class Client; + class PageHandle; + + /// Constructs a new buffer pool. + /// 'min_buffer_len': the minimum buffer length for the pool. Must be a power of two. + /// 'buffer_bytes_limit': the maximum physical memory in bytes that can be used by the + /// buffer pool. If 'buffer_bytes_limit' is not a multiple of 'min_buffer_len', the + /// remainder will not be usable. + BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit); + ~BufferPool(); + + /// Register a client. Returns an error status and does not register the client if the + /// arguments are invalid. 'name' is an arbitrary name used to identify the client in + /// any errors messages or logging. Counters for this client are added to the (non-NULL) + /// 'profile'. 'client' is the client to register. 'client' should not already be + /// registered. + Status RegisterClient(const std::string& name, ReservationTracker* reservation, + RuntimeProfile* profile, Client* client); + + /// Deregister 'client' if it is registered. Idempotent. + void DeregisterClient(Client* client); + + /// Create a new page of 'len' bytes with pin count 1. 'len' must be a page length + /// supported by BufferPool (see BufferPool class comment). The client must have + /// sufficient unused reservation to pin the new page (otherwise it will DCHECK). + /// CreatePage() only fails when a system error prevents the buffer pool from fulfilling + /// the reservation. + /// On success, the handle is mapped to the new page. + Status CreatePage(Client* client, int64_t len, PageHandle* handle); + + /// Increment the pin count of 'handle'. After Pin() the underlying page will + /// be mapped to a buffer, which will be accessible through 'handle'. Uses + /// reservation from 'client'. The caller is responsible for ensuring it has enough + /// unused reservation before calling Pin() (otherwise it will DCHECK). Pin() only + /// fails when a system error prevents the buffer pool from fulfilling the reservation. + /// 'handle' must be open. + Status Pin(Client* client, PageHandle* handle); + + /// Decrement the pin count of 'handle'. Decrease client's reservation usage. If the + /// handle's pin count becomes zero, it is no longer valid for the underlying page's + /// buffer to be accessed via 'handle'. If the page's total pin count across all + /// handles that reference it goes to zero, the page's data may be written to disk and + /// the buffer reclaimed. 'handle' must be open and have a pin count > 0. + /// TODO: once we implement spilling, it will be an error to call Unpin() with + /// spilling disabled. E.g. if Impala is running without scratch (we want to be + /// able to test Unpin() before we implement actual spilling). + void Unpin(Client* client, PageHandle* handle); + + /// Destroy the page referenced by 'handle' (if 'handle' is open). Any buffers or disk + /// storage backing the page are freed. Idempotent. If the page is pinned, the + /// reservation usage is decreased accordingly. + void DestroyPage(Client* client, PageHandle* handle); + + /// Extracts buffer from a pinned page. After this returns, the page referenced by + /// 'page_handle' will be destroyed and 'buffer_handle' will reference the buffer from + /// 'page_handle'. This may decrease reservation usage of 'client' if the page was + /// pinned multiple times via 'page_handle'. + void ExtractBuffer( + Client* client, PageHandle* page_handle, BufferHandle* buffer_handle); + + /// Allocates a new buffer of 'len' bytes. Uses reservation from 'client'. The caller + /// is responsible for ensuring it has enough unused reservation before calling + /// AllocateBuffer() (otherwise it will DCHECK). AllocateBuffer() only fails when + /// a system error prevents the buffer pool from fulfilling the reservation. + Status AllocateBuffer(Client* client, int64_t len, BufferHandle* handle); + + /// If 'handle' is open, close 'handle', free the buffer and and decrease the + /// reservation usage from 'client'. Idempotent. + void FreeBuffer(Client* client, BufferHandle* handle); + + /// Transfer ownership of buffer from 'src_client' to 'dst_client' and move the + /// handle from 'src' to 'dst'. Increases reservation usage in 'dst_client' and + /// decreases reservation usage in 'src_client'. 'src' must be open and 'dst' must be + /// closed before calling. 'src'/'dst' and 'src_client'/'dst_client' must be different. + /// After a successful call, 'src' is closed and 'dst' is open. + Status TransferBuffer(Client* src_client, BufferHandle* src, Client* dst_client, + BufferHandle* dst); + + /// Print a debug string with the state of the buffer pool. + std::string DebugString(); + + int64_t min_buffer_len() const; + int64_t buffer_bytes_limit() const; + + private: + DISALLOW_COPY_AND_ASSIGN(BufferPool); + struct Page; + + /// Same as Unpin(), except the lock for the page referenced by 'handle' must be held + /// by the caller. + void UnpinLocked(Client* client, PageHandle* handle); + + /// Perform the cleanup of the page object and handle when the page is destroyed. + /// Reset 'handle', free the Page object and remove the 'pages_' entry. + /// The 'handle->page_' lock should *not* be held by the caller. + void CleanUpPage(PageHandle* handle); + + /// Allocate a buffer of length 'len'. Assumes that the client's reservation has already + /// been consumed for the buffer. Returns an error if the pool is unable to fulfill the + /// reservation. + Status AllocateBufferInternal(Client* client, int64_t len, BufferHandle* buffer); + + /// Frees 'buffer', which must be open before calling. Closes 'buffer' and updates + /// internal state but does not release to any reservation. + void FreeBufferInternal(BufferHandle* buffer); + + /// Check if we can allocate another buffer of size 'len' bytes without + /// 'buffer_bytes_remaining_' going negative. + /// Returns true and decrease 'buffer_bytes_remaining_' by 'len' if successful. + bool TryDecreaseBufferBytesRemaining(int64_t len); + + /// Allocator for allocating and freeing all buffer memory. + boost::scoped_ptr<BufferAllocator> allocator_; + + /// The minimum length of a buffer in bytes. All buffers and pages are a power-of-two + /// multiple of this length. This is always a power of two. + const int64_t min_buffer_len_; + + /// The maximum physical memory in bytes that can be used for buffers. + const int64_t buffer_bytes_limit_; + + /// The remaining number of bytes of 'buffer_bytes_limit_' that can be used for + /// allocating new buffers. Must be updated atomically before a new buffer is + /// allocated or after an existing buffer is freed. + AtomicInt64 buffer_bytes_remaining_; + + /// List containing all pages. Protected by the list's internal lock. + typedef InternalQueue<Page> PageList; + PageList pages_; +}; + +/// External representation of a client of the BufferPool. Clients are used for +/// reservation accounting, and will be used in the future for tracking per-client +/// buffer pool counters. This class is the external handle for a client so +/// each Client instance is owned by the BufferPool's client, rather than the BufferPool. +/// Each Client should only be used by a single thread at a time: concurrently calling +/// Client methods or BufferPool methods with the Client as an argument is not supported. +class BufferPool::Client { + public: + Client() : reservation_(NULL) {} + /// Client must be deregistered. + ~Client() { DCHECK(!is_registered()); } + + bool is_registered() const { return reservation_ != NULL; } + ReservationTracker* reservation() { return reservation_; } + + std::string DebugString() const; + + private: + friend class BufferPool; + DISALLOW_COPY_AND_ASSIGN(Client); + + /// Initialize 'counters_' and add the counters to 'profile'. + void InitCounters(RuntimeProfile* profile); + + /// A name identifying the client. + std::string name_; + + /// The reservation tracker for the client. NULL means the client isn't registered. + /// All pages pinned by the client count as usage against 'reservation_'. + ReservationTracker* reservation_; + + /// The RuntimeProfile counters for this client. All non-NULL if is_registered() + /// is true. + BufferPoolClientCounters counters_; +}; + +/// A handle to a buffer allocated from the buffer pool. Each BufferHandle should only +/// be used by a single thread at a time: concurrently calling BufferHandle methods or +/// BufferPool methods with the BufferHandle as an argument is not supported. +class BufferPool::BufferHandle { + public: + BufferHandle(); + ~BufferHandle() { DCHECK(!is_open()); } + + /// Allow move construction of handles, to support std::move(). + BufferHandle(BufferHandle&& src); + + /// Allow move assignment of handles, to support STL classes like std::vector. + /// Destination must be uninitialized. + BufferHandle& operator=(BufferHandle&& src); + + bool is_open() const { return data_ != NULL; } + int64_t len() const { + DCHECK(is_open()); + return len_; + } + /// Get a pointer to the start of the buffer. + uint8_t* data() const { + DCHECK(is_open()); + return data_; + } + + std::string DebugString() const; + + private: + DISALLOW_COPY_AND_ASSIGN(BufferHandle); + friend class BufferPool; + + /// Internal helper to set the handle to an opened state. + void Open(const Client* client, uint8_t* data, int64_t len); + + /// Internal helper to reset the handle to an unopened state. + void Reset(); + + /// The client the buffer handle belongs to, used to validate that the correct client + /// is provided in BufferPool method calls. + const Client* client_; + + /// Pointer to the start of the buffer. Non-NULL if open, NULL if closed. + uint8_t* data_; + + /// Length of the buffer in bytes. + int64_t len_; +}; + +/// The handle for a page used by clients of the BufferPool. Each PageHandle should +/// only be used by a single thread at a time: concurrently calling PageHandle methods +/// or BufferPool methods with the PageHandle as an argument is not supported. +class BufferPool::PageHandle { + public: + PageHandle(); + ~PageHandle() { DCHECK(!is_open()); } + + // Allow move construction of page handles, to support std::move(). + PageHandle(PageHandle&& src); + + // Allow move assignment of page handles, to support STL classes like std::vector. + // Destination must be closed. + PageHandle& operator=(PageHandle&& src); + + bool is_open() const { return page_ != NULL; } + bool is_pinned() const { return pin_count() > 0; } + int pin_count() const; + int64_t len() const; + /// Get a pointer to the start of the page's buffer. Only valid to call if the page + /// is pinned via this handle. + uint8_t* data() const { return buffer_handle()->data(); } + + /// Return a pointer to the page's buffer handle. Only valid to call if the page is + /// pinned via this handle. Only const accessors of the returned handle can be used: + /// it is invalid to call FreeBuffer() or TransferBuffer() on it or to otherwise modify + /// the handle. + const BufferHandle* buffer_handle() const; + + std::string DebugString() const; + + private: + DISALLOW_COPY_AND_ASSIGN(PageHandle); + friend class BufferPool; + friend class Page; + + /// Internal helper to open the handle for the given page. + void Open(Page* page, Client* client); + + /// Internal helper to reset the handle to an unopened state. + void Reset(); + + /// The internal page structure. NULL if the handle is not open. + Page* page_; + + /// The client the page handle belongs to, used to validate that the correct client + /// is being used. + const Client* client_; +}; + +} + +#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/reservation-tracker-counters.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/reservation-tracker-counters.h b/be/src/runtime/bufferpool/reservation-tracker-counters.h new file mode 100644 index 0000000..8124e10 --- /dev/null +++ b/be/src/runtime/bufferpool/reservation-tracker-counters.h @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_RESERVATION_TRACKER_COUNTERS_H +#define IMPALA_RUNTIME_RESERVATION_TRACKER_COUNTERS_H + +#include "util/runtime-profile.h" + +namespace impala { + +/// A set of counters for each ReservationTracker for reporting purposes. +/// +/// If the ReservationTracker is linked to a profile these have the same lifetime as that +/// profile, otherwise they have the same lifetime as the ReservationTracker itself. +struct ReservationTrackerCounters { + /// The tracker's peak reservation in bytes. + RuntimeProfile::HighWaterMarkCounter* peak_reservation; + + /// The tracker's peak usage in bytes. + RuntimeProfile::HighWaterMarkCounter* peak_used_reservation; + + /// The hard limit on the tracker's reservations + RuntimeProfile::Counter* reservation_limit; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/reservation-tracker-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc new file mode 100644 index 0000000..416a53e --- /dev/null +++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc @@ -0,0 +1,378 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <limits> +#include <string> +#include <vector> + +#include "runtime/bufferpool/reservation-tracker.h" +#include "common/init.h" +#include "common/object-pool.h" +#include "runtime/mem-tracker.h" +#include "testutil/gtest-util.h" + +#include "common/names.h" + +namespace impala { + +class ReservationTrackerTest : public ::testing::Test { + public: + virtual void SetUp() {} + + virtual void TearDown() { + root_.Close(); + obj_pool_.Clear(); + } + + /// The minimum allocation size used in most tests. + const static int64_t MIN_BUFFER_LEN = 1024; + + protected: + RuntimeProfile* NewProfile() { + return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile")); + } + + ObjectPool obj_pool_; + + ReservationTracker root_; + + scoped_ptr<RuntimeProfile> profile_; +}; + +const int64_t ReservationTrackerTest::MIN_BUFFER_LEN; + +TEST_F(ReservationTrackerTest, BasicSingleTracker) { + const int64_t limit = 16; + root_.InitRootTracker(NULL, limit); + ASSERT_EQ(0, root_.GetReservation()); + ASSERT_EQ(0, root_.GetUsedReservation()); + + // Fail to increase reservation. + ASSERT_FALSE(root_.IncreaseReservation(limit + 1)); + ASSERT_EQ(0, root_.GetReservation()); + ASSERT_EQ(0, root_.GetUsedReservation()); + ASSERT_EQ(0, root_.GetUnusedReservation()); + + // Successfully increase reservation. + ASSERT_TRUE(root_.IncreaseReservation(limit - 1)); + ASSERT_EQ(limit - 1, root_.GetReservation()); + ASSERT_EQ(0, root_.GetUsedReservation()); + ASSERT_EQ(limit - 1, root_.GetUnusedReservation()); + + // Adjust usage. + root_.AllocateFrom(2); + ASSERT_EQ(limit - 1, root_.GetReservation()); + ASSERT_EQ(2, root_.GetUsedReservation()); + ASSERT_EQ(limit - 3, root_.GetUnusedReservation()); + root_.ReleaseTo(1); + ASSERT_EQ(1, root_.GetUsedReservation()); + root_.ReleaseTo(1); + ASSERT_EQ(0, root_.GetUsedReservation()); + ASSERT_EQ(limit - 1, root_.GetReservation()); + ASSERT_EQ(limit - 1, root_.GetUnusedReservation()); +} + +TEST_F(ReservationTrackerTest, BasicTwoLevel) { + const int64_t limit = 16; + root_.InitRootTracker(NULL, limit); + + const int64_t root_reservation = limit / 2; + // Get half of the limit as an initial reservation. + ASSERT_TRUE(root_.IncreaseReservation(root_reservation)); + ASSERT_EQ(root_reservation, root_.GetReservation()); + ASSERT_EQ(root_reservation, root_.GetUnusedReservation()); + ASSERT_EQ(0, root_.GetUsedReservation()); + ASSERT_EQ(0, root_.GetChildReservations()); + + ReservationTracker child; + child.InitChildTracker(NULL, &root_, NULL, numeric_limits<int64_t>::max()); + + const int64_t child_reservation = root_reservation + 1; + // Get parent's reservation plus a bit more. + ASSERT_TRUE(child.IncreaseReservation(child_reservation)); + + ASSERT_EQ(child_reservation, root_.GetReservation()); + ASSERT_EQ(0, root_.GetUnusedReservation()); + ASSERT_EQ(0, root_.GetUsedReservation()); + ASSERT_EQ(child_reservation, root_.GetChildReservations()); + + ASSERT_EQ(child_reservation, child.GetReservation()); + ASSERT_EQ(child_reservation, child.GetUnusedReservation()); + ASSERT_EQ(0, child.GetUsedReservation()); + ASSERT_EQ(0, child.GetChildReservations()); + + // Check that child allocation is reflected correctly. + child.AllocateFrom(1); + ASSERT_EQ(child_reservation, child.GetReservation()); + ASSERT_EQ(1, child.GetUsedReservation()); + ASSERT_EQ(0, root_.GetUsedReservation()); + + // Check that parent reservation increase is reflected correctly. + ASSERT_TRUE(root_.IncreaseReservation(1)); + ASSERT_EQ(child_reservation + 1, root_.GetReservation()); + ASSERT_EQ(1, root_.GetUnusedReservation()); + ASSERT_EQ(0, root_.GetUsedReservation()); + ASSERT_EQ(child_reservation, root_.GetChildReservations()); + ASSERT_EQ(child_reservation, child.GetReservation()); + + // Check that parent allocation is reflected correctly. + root_.AllocateFrom(1); + ASSERT_EQ(child_reservation + 1, root_.GetReservation()); + ASSERT_EQ(0, root_.GetUnusedReservation()); + ASSERT_EQ(1, root_.GetUsedReservation()); + + // Release allocations. + root_.ReleaseTo(1); + ASSERT_EQ(0, root_.GetUsedReservation()); + child.ReleaseTo(1); + ASSERT_EQ(0, child.GetUsedReservation()); + + // Child reservation should be returned all the way up the tree. + child.DecreaseReservation(1); + ASSERT_EQ(child_reservation, root_.GetReservation()); + ASSERT_EQ(child_reservation - 1, child.GetReservation()); + ASSERT_EQ(child_reservation - 1, root_.GetChildReservations()); + + // Closing the child should release its reservation. + child.Close(); + ASSERT_EQ(1, root_.GetReservation()); + ASSERT_EQ(0, root_.GetChildReservations()); +} + +TEST_F(ReservationTrackerTest, CloseIdempotency) { + // Check we can close before opening. + root_.Close(); + + const int64_t limit = 16; + root_.InitRootTracker(NULL, limit); + + // Check we can close twice + root_.Close(); + root_.Close(); +} + +// Test that the tracker's reservation limit is enforced. +TEST_F(ReservationTrackerTest, ReservationLimit) { + Status status; + // Setup trackers so that there is a spare buffer that the client isn't entitled to. + int64_t total_mem = MIN_BUFFER_LEN * 3; + int64_t client_limit = MIN_BUFFER_LEN * 2; + root_.InitRootTracker(NULL, total_mem); + + ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); + client_tracker->InitChildTracker(NULL, &root_, NULL, client_limit); + + // We can only increase reservation up to the client's limit. + ASSERT_FALSE(client_tracker->IncreaseReservation(client_limit + 1)); + ASSERT_TRUE(client_tracker->IncreaseReservation(client_limit)); + ASSERT_FALSE(client_tracker->IncreaseReservation(1)); + + client_tracker->Close(); +} + +// Test that parent's reservation limit is enforced. +TEST_F(ReservationTrackerTest, ParentReservationLimit) { + Status status; + // Setup reservations so that there is a spare buffer. + int64_t total_mem = MIN_BUFFER_LEN * 4; + int64_t parent_limit = MIN_BUFFER_LEN * 3; + int64_t other_client_reservation = MIN_BUFFER_LEN; + root_.InitRootTracker(NULL, total_mem); + + // The child reservation limit is higher than the parent reservation limit, so the + // parent limit is the effective limit. + ReservationTracker* query_tracker = obj_pool_.Add(new ReservationTracker()); + ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); + query_tracker->InitChildTracker(NULL, &root_, NULL, parent_limit); + client_tracker->InitChildTracker(NULL, query_tracker, NULL, total_mem * 10); + + ReservationTracker* other_client_tracker = obj_pool_.Add(new ReservationTracker()); + other_client_tracker->InitChildTracker(NULL, query_tracker, NULL, total_mem); + ASSERT_TRUE(other_client_tracker->IncreaseReservationToFit(other_client_reservation)); + ASSERT_EQ(root_.GetUsedReservation(), 0); + ASSERT_EQ(root_.GetChildReservations(), other_client_reservation); + ASSERT_EQ(query_tracker->GetUsedReservation(), 0); + ASSERT_EQ(query_tracker->GetUnusedReservation(), 0); + + // Can only increase reservation up to parent limit, excluding other reservations. + int64_t effective_limit = parent_limit - other_client_reservation; + ASSERT_FALSE(client_tracker->IncreaseReservation(effective_limit + MIN_BUFFER_LEN)); + ASSERT_TRUE(client_tracker->IncreaseReservation(effective_limit)); + ASSERT_FALSE(client_tracker->IncreaseReservation(MIN_BUFFER_LEN)); + + // Check that tracker hierarchy reports correct usage. + ASSERT_EQ(root_.GetUsedReservation(), 0); + ASSERT_EQ(root_.GetChildReservations(), parent_limit); + ASSERT_EQ(query_tracker->GetUsedReservation(), 0); + ASSERT_EQ(query_tracker->GetUnusedReservation(), 0); + + client_tracker->Close(); + other_client_tracker->Close(); + query_tracker->Close(); +} + +/// Test integration of ReservationTracker with MemTracker. +TEST_F(ReservationTrackerTest, MemTrackerIntegrationTwoLevel) { + // Setup a 2-level hierarchy of trackers. The child ReservationTracker is linked to + // the child MemTracker. We add various limits at different places to enable testing + // of different code paths. + root_.InitRootTracker(NewProfile(), MIN_BUFFER_LEN * 100); + MemTracker root_mem_tracker; + MemTracker child_mem_tracker1(-1, "Child 1", &root_mem_tracker); + MemTracker child_mem_tracker2(MIN_BUFFER_LEN * 50, "Child 2", &root_mem_tracker); + ReservationTracker child_reservations1, child_reservations2; + child_reservations1.InitChildTracker( + NewProfile(), &root_, &child_mem_tracker1, 500 * MIN_BUFFER_LEN); + child_reservations2.InitChildTracker( + NewProfile(), &root_, &child_mem_tracker2, 75 * MIN_BUFFER_LEN); + + // Check that a single buffer reservation is accounted correctly. + ASSERT_TRUE(child_reservations1.IncreaseReservation(MIN_BUFFER_LEN)); + ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation()); + ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption()); + ASSERT_EQ(MIN_BUFFER_LEN, root_mem_tracker.consumption()); + ASSERT_EQ(MIN_BUFFER_LEN, root_.GetChildReservations()); + + // Check that a buffer reservation from the other child is accounted correctly. + ASSERT_TRUE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN)); + ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation()); + ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption()); + ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption()); + ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations()); + + // Check that hitting the MemTracker limit leaves things in a consistent state. + ASSERT_FALSE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN * 50)); + ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation()); + ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption()); + ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation()); + ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption()); + ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption()); + ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations()); + + // Check that hitting the ReservationTracker's local limit leaves things in a + // consistent state. + ASSERT_FALSE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN * 75)); + ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation()); + ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption()); + ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation()); + ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption()); + ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption()); + ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations()); + + // Check that hitting the ReservationTracker's parent's limit after the + // MemTracker consumption is incremented leaves things in a consistent state. + ASSERT_FALSE(child_reservations1.IncreaseReservation(MIN_BUFFER_LEN * 100)); + ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation()); + ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption()); + ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation()); + ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption()); + ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption()); + ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations()); + + // Check that released memory is decremented from all trackers correctly. + child_reservations1.DecreaseReservation(MIN_BUFFER_LEN); + child_reservations2.DecreaseReservation(MIN_BUFFER_LEN); + ASSERT_EQ(0, child_reservations2.GetReservation()); + ASSERT_EQ(0, child_mem_tracker2.consumption()); + ASSERT_EQ(0, root_mem_tracker.consumption()); + ASSERT_EQ(0, root_.GetUsedReservation()); + + child_reservations1.Close(); + child_reservations2.Close(); + child_mem_tracker1.UnregisterFromParent(); + child_mem_tracker2.UnregisterFromParent(); +} + +TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) { + const int HIERARCHY_DEPTH = 5; + // Setup a multi-level hierarchy of trackers and ensure that consumption is reported + // correctly. + ReservationTracker reservations[HIERARCHY_DEPTH]; + scoped_ptr<MemTracker> mem_trackers[HIERARCHY_DEPTH]; + + // We can only handle MemTracker limits at the topmost linked ReservationTracker, + // so avoid adding limits at lower level. + const int LIMIT = HIERARCHY_DEPTH; + vector<int> mem_limits({LIMIT * 10, LIMIT, -1, -1, -1}); + + // Root trackers aren't linked. + mem_trackers[0].reset(new MemTracker(mem_limits[0])); + reservations[0].InitRootTracker(NewProfile(), 500); + for (int i = 1; i < HIERARCHY_DEPTH; ++i) { + mem_trackers[i].reset(new MemTracker( + mem_limits[i], Substitute("Child $0", i), mem_trackers[i - 1].get())); + reservations[i].InitChildTracker( + NewProfile(), &reservations[i - 1], mem_trackers[i].get(), 500); + } + + vector<int> interesting_amounts({LIMIT - 1, LIMIT, LIMIT + 1}); + + // Test that all limits and consumption correctly reported when consuming + // from a non-root ReservationTracker that is connected to a MemTracker. + for (int level = 1; level < HIERARCHY_DEPTH; ++level) { + int64_t lowest_limit = mem_trackers[level]->lowest_limit(); + for (int amount : interesting_amounts) { + bool increased = reservations[level].IncreaseReservation(amount); + if (lowest_limit == -1 || amount <= lowest_limit) { + // The increase should go through. + ASSERT_TRUE(increased) << reservations[level].DebugString(); + ASSERT_EQ(amount, reservations[level].GetReservation()); + ASSERT_EQ(amount, mem_trackers[level]->consumption()); + for (int ancestor = 0; ancestor < level; ++ancestor) { + ASSERT_EQ(amount, reservations[ancestor].GetChildReservations()); + ASSERT_EQ(amount, mem_trackers[ancestor]->consumption()); + } + + LOG(INFO) << "\n" << mem_trackers[0]->LogUsage(); + reservations[level].DecreaseReservation(amount); + } else { + ASSERT_FALSE(increased); + } + // We should be back in the original state. + for (int i = 0; i < HIERARCHY_DEPTH; ++i) { + ASSERT_EQ(0, reservations[i].GetReservation()) << i << ": " + << reservations[i].DebugString(); + ASSERT_EQ(0, reservations[i].GetChildReservations()); + ASSERT_EQ(0, mem_trackers[i]->consumption()); + } + } + } + + // "Pull down" a reservation from the top of the hierarchy level-by-level to the + // leaves, checking that everything is consistent at each step. + for (int level = 0; level < HIERARCHY_DEPTH; ++level) { + const int amount = LIMIT; + ASSERT_TRUE(reservations[level].IncreaseReservation(amount)); + ASSERT_EQ(amount, reservations[level].GetReservation()); + ASSERT_EQ(0, reservations[level].GetUsedReservation()); + if (level != 0) ASSERT_EQ(amount, mem_trackers[level]->consumption()); + for (int ancestor = 0; ancestor < level; ++ancestor) { + ASSERT_EQ(0, reservations[ancestor].GetUsedReservation()); + ASSERT_EQ(amount, reservations[ancestor].GetChildReservations()); + ASSERT_EQ(amount, mem_trackers[ancestor]->consumption()); + } + reservations[level].DecreaseReservation(amount); + } + + for (int i = HIERARCHY_DEPTH - 1; i >= 0; --i) { + reservations[i].Close(); + if (i != 0) mem_trackers[i]->UnregisterFromParent(); + } +} +} + +IMPALA_TEST_MAIN(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/reservation-tracker.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/reservation-tracker.cc b/be/src/runtime/bufferpool/reservation-tracker.cc new file mode 100644 index 0000000..7138d69 --- /dev/null +++ b/be/src/runtime/bufferpool/reservation-tracker.cc @@ -0,0 +1,306 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/bufferpool/reservation-tracker.h" + +#include <algorithm> + +#include "common/object-pool.h" +#include "gutil/strings/substitute.h" +#include "runtime/mem-tracker.h" +#include "util/dummy-runtime-profile.h" +#include "util/runtime-profile-counters.h" + +#include "common/names.h" + +namespace impala { + +ReservationTracker::ReservationTracker() : initialized_(false), mem_tracker_(NULL) {} + +ReservationTracker::~ReservationTracker() { + DCHECK(!initialized_); +} + +void ReservationTracker::InitRootTracker( + RuntimeProfile* profile, int64_t reservation_limit) { + lock_guard<SpinLock> l(lock_); + DCHECK(!initialized_); + parent_ = NULL; + mem_tracker_ = NULL; + reservation_limit_ = reservation_limit; + reservation_ = 0; + used_reservation_ = 0; + child_reservations_ = 0; + initialized_ = true; + + InitCounters(profile, reservation_limit_); + COUNTER_SET(counters_.peak_reservation, reservation_); + + CheckConsistency(); +} + +void ReservationTracker::InitChildTracker(RuntimeProfile* profile, + ReservationTracker* parent, MemTracker* mem_tracker, int64_t reservation_limit) { + DCHECK(parent != NULL); + DCHECK_GE(reservation_limit, 0); + + lock_guard<SpinLock> l(lock_); + DCHECK(!initialized_); + parent_ = parent; + mem_tracker_ = mem_tracker; + + reservation_limit_ = reservation_limit; + reservation_ = 0; + used_reservation_ = 0; + child_reservations_ = 0; + initialized_ = true; + + if (mem_tracker_ != NULL) { + MemTracker* parent_mem_tracker = GetParentMemTracker(); + if (parent_mem_tracker != NULL) { + // Make sure the parent links of the MemTrackers correspond to our parent links. + DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent()); + // Make sure we don't have a lower limit than the ancestor, since we don't enforce + // limits at lower links. + DCHECK_EQ(mem_tracker_->lowest_limit(), parent_mem_tracker->lowest_limit()); + } else { + // Make sure we didn't leave a gap in the links. E.g. this tracker's grandparent + // shouldn't have a MemTracker. + ReservationTracker* ancestor = parent_; + while (ancestor != NULL) { + DCHECK(ancestor->mem_tracker_ == NULL); + ancestor = ancestor->parent_; + } + } + } + + InitCounters(profile, reservation_limit_); + + CheckConsistency(); +} + +void ReservationTracker::InitCounters( + RuntimeProfile* profile, int64_t reservation_limit) { + bool profile_provided = profile != NULL; + if (profile == NULL) { + dummy_profile_.reset(new DummyProfile); + profile = dummy_profile_->profile(); + } + + // Check that another tracker's counters aren't already registered in the profile. + DCHECK(profile->GetCounter("BufferPoolInitialReservation") == NULL); + counters_.reservation_limit = + ADD_COUNTER(profile, "BufferPoolReservationLimit", TUnit::BYTES); + counters_.peak_reservation = + profile->AddHighWaterMarkCounter("BufferPoolPeakReservation", TUnit::BYTES); + counters_.peak_used_reservation = + profile->AddHighWaterMarkCounter("BufferPoolPeakUsedReservation", TUnit::BYTES); + + COUNTER_SET(counters_.reservation_limit, reservation_limit); + + if (mem_tracker_ != NULL && profile_provided) { + mem_tracker_->EnableReservationReporting(counters_); + } +} + +void ReservationTracker::Close() { + lock_guard<SpinLock> l(lock_); + if (!initialized_) return; + CheckConsistency(); + DCHECK_EQ(used_reservation_, 0); + DCHECK_EQ(child_reservations_, 0); + // Release any reservation to parent. + if (parent_ != NULL) DecreaseReservationInternalLocked(reservation_, false); + mem_tracker_ = NULL; + parent_ = NULL; + initialized_ = false; +} + +bool ReservationTracker::IncreaseReservation(int64_t bytes) { + lock_guard<SpinLock> l(lock_); + return IncreaseReservationInternalLocked(bytes, false, false); +} + +bool ReservationTracker::IncreaseReservationToFit(int64_t bytes) { + lock_guard<SpinLock> l(lock_); + return IncreaseReservationInternalLocked(bytes, true, false); +} + +bool ReservationTracker::IncreaseReservationInternalLocked( + int64_t bytes, bool use_existing_reservation, bool is_child_reservation) { + DCHECK(initialized_); + int64_t reservation_increase = + use_existing_reservation ? max<int64_t>(0, bytes - unused_reservation()) : bytes; + DCHECK_GE(reservation_increase, 0); + + bool granted; + // Check if the increase is allowed, starting at the bottom of hierarchy. + if (reservation_ + reservation_increase > reservation_limit_) { + granted = false; + } else if (reservation_increase == 0) { + granted = true; + } else { + if (parent_ == NULL) { + granted = true; + } else { + lock_guard<SpinLock> l(parent_->lock_); + granted = + parent_->IncreaseReservationInternalLocked(reservation_increase, true, true); + } + if (granted && !TryUpdateMemTracker(reservation_increase)) { + granted = false; + // Roll back changes to ancestors if MemTracker update fails. + parent_->DecreaseReservationInternal(reservation_increase, true); + } + } + + if (granted) { + // The reservation was granted and state updated in all ancestors: we can modify + // this tracker's state now. + UpdateReservation(reservation_increase); + if (is_child_reservation) child_reservations_ += bytes; + } + + CheckConsistency(); + return granted; +} + +bool ReservationTracker::TryUpdateMemTracker(int64_t reservation_increase) { + if (mem_tracker_ == NULL) return true; + if (GetParentMemTracker() == NULL) { + // At the topmost link, which may be a MemTracker with a limit, we need to use + // TryConsume() to check the limit. + return mem_tracker_->TryConsume(reservation_increase); + } else { + // For lower links, there shouldn't be a limit to enforce, so we just need to + // update the consumption of the linked MemTracker since the reservation is + // already reflected in its parent. + mem_tracker_->ConsumeLocal(reservation_increase, GetParentMemTracker()); + return true; + } +} + +void ReservationTracker::DecreaseReservation(int64_t bytes) { + DecreaseReservationInternal(bytes, false); +} + +void ReservationTracker::DecreaseReservationInternal( + int64_t bytes, bool is_child_reservation) { + lock_guard<SpinLock> l(lock_); + DecreaseReservationInternalLocked(bytes, is_child_reservation); +} + +void ReservationTracker::DecreaseReservationInternalLocked( + int64_t bytes, bool is_child_reservation) { + DCHECK(initialized_); + DCHECK_GE(reservation_, bytes); + if (bytes == 0) return; + if (is_child_reservation) child_reservations_ -= bytes; + UpdateReservation(-bytes); + // The reservation should be returned up the tree. + if (mem_tracker_ != NULL) { + if (GetParentMemTracker() == NULL) { + mem_tracker_->Release(bytes); + } else { + mem_tracker_->ReleaseLocal(bytes, GetParentMemTracker()); + } + } + if (parent_ != NULL) parent_->DecreaseReservationInternal(bytes, true); + CheckConsistency(); +} + +void ReservationTracker::AllocateFrom(int64_t bytes) { + lock_guard<SpinLock> l(lock_); + DCHECK(initialized_); + DCHECK_GE(bytes, 0); + DCHECK_LE(bytes, unused_reservation()); + UpdateUsedReservation(bytes); + CheckConsistency(); +} + +void ReservationTracker::ReleaseTo(int64_t bytes) { + lock_guard<SpinLock> l(lock_); + DCHECK(initialized_); + DCHECK_GE(bytes, 0); + DCHECK_LE(bytes, used_reservation_); + UpdateUsedReservation(-bytes); + CheckConsistency(); +} + +int64_t ReservationTracker::GetReservation() { + lock_guard<SpinLock> l(lock_); + DCHECK(initialized_); + return reservation_; +} + +int64_t ReservationTracker::GetUsedReservation() { + lock_guard<SpinLock> l(lock_); + DCHECK(initialized_); + return used_reservation_; +} + +int64_t ReservationTracker::GetUnusedReservation() { + lock_guard<SpinLock> l(lock_); + DCHECK(initialized_); + return unused_reservation(); +} + +int64_t ReservationTracker::GetChildReservations() { + lock_guard<SpinLock> l(lock_); + DCHECK(initialized_); + return child_reservations_; +} + +void ReservationTracker::CheckConsistency() const { + // Check internal invariants. + DCHECK_GE(reservation_, 0); + DCHECK_LE(reservation_, reservation_limit_); + DCHECK_GE(child_reservations_, 0); + DCHECK_GE(used_reservation_, 0); + DCHECK_LE(used_reservation_ + child_reservations_, reservation_); + + DCHECK_EQ(reservation_, counters_.peak_reservation->current_value()); + DCHECK_LE(reservation_, counters_.peak_reservation->value()); + DCHECK_EQ(used_reservation_, counters_.peak_used_reservation->current_value()); + DCHECK_LE(used_reservation_, counters_.peak_used_reservation->value()); + DCHECK_EQ(reservation_limit_, counters_.reservation_limit->value()); +} + +void ReservationTracker::UpdateUsedReservation(int64_t delta) { + used_reservation_ += delta; + COUNTER_SET(counters_.peak_used_reservation, used_reservation_); + CheckConsistency(); +} + +void ReservationTracker::UpdateReservation(int64_t delta) { + reservation_ += delta; + COUNTER_SET(counters_.peak_reservation, reservation_); + CheckConsistency(); +} + +string ReservationTracker::DebugString() { + lock_guard<SpinLock> l(lock_); + if (!initialized_) return "<ReservationTracker>: uninitialized"; + + string parent_debug_string = parent_ == NULL ? "NULL" : parent_->DebugString(); + return Substitute( + "<ReservationTracker>: reservation_limit $0 reservation $1 used_reservation $2 " + "child_reservations $3 parent:\n$4", + reservation_limit_, reservation_, used_reservation_, child_reservations_, + parent_debug_string); +} +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/bufferpool/reservation-tracker.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/reservation-tracker.h b/be/src/runtime/bufferpool/reservation-tracker.h new file mode 100644 index 0000000..c786f80 --- /dev/null +++ b/be/src/runtime/bufferpool/reservation-tracker.h @@ -0,0 +1,248 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_RESERVATION_TRACKER_H +#define IMPALA_RUNTIME_RESERVATION_TRACKER_H + +#include <stdint.h> +#include <boost/scoped_ptr.hpp> +#include <boost/thread/locks.hpp> +#include <string> + +#include "runtime/bufferpool/reservation-tracker-counters.h" +#include "common/status.h" +#include "util/spinlock.h" + +namespace impala { + +class DummyProfile; +class MemTracker; +class RuntimeProfile; + +/// A tracker for a hierarchy of buffer pool memory reservations, denominated in bytes. +/// A hierarchy of ReservationTrackers provides a mechanism for subdividing buffer pool +/// memory and enforcing upper and lower bounds on memory usage. +/// +/// The root of the tracker tree enforces a global maximum, which is distributed among its +/// children. Each tracker in the tree has a 'reservation': the total bytes of buffer pool +/// memory it is entitled to use. The reservation is inclusive of any memory that is +/// already allocated from the reservation, i.e. using a reservation to allocate memory +/// does not subtract from the reservation. +/// +/// A reservation can be used directly at the tracker by calling AllocateFrom(), or +/// distributed to children of the tracker for the childrens' reservations. Each tracker +/// in the tree can use up to its reservation without checking parent trackers. To +/// increase its reservation, a tracker must use some of its parent's reservation (and +/// perhaps increase reservations all the way to the root of the tree). +/// +/// Each tracker also has a maximum reservation that is enforced. E.g. if the root of the +/// tracker hierarchy is the global tracker for the Impala daemon and the next level of +/// the hierarchy is made up of per-query trackers, then the maximum reservation +/// mechanism can enforce both process-level and query-level limits on reservations. +/// +/// Invariants: +/// * A tracker's reservation is at most its reservation limit: reservation <= limit +/// * A tracker's reservation is at least the sum of its childrens' reservations plus +/// the amount of the reservation used directly at this tracker. The difference is +/// the unused reservation: +/// child_reservations + used_reservation + unused_reservation = reservation. +/// +/// Thread-safety: +/// All public ReservationTracker methods are thread-safe. If multiple threads +/// concurrently invoke methods on a ReservationTracker, each operation is applied +/// atomically to leave the ReservationTracker in a consistent state. Calling threads +/// are responsible for coordinating to avoid violating any method preconditions, +/// e.g. ensuring that there is sufficient unused reservation before calling AllocateTo(). +/// +/// Integration with MemTracker hierarchy: +/// TODO: we will remove MemTracker and this integration once all memory is accounted via +/// reservations. +/// +/// Each ReservationTracker can optionally have a linked MemTracker. E.g. an exec +/// node's ReservationTracker can be linked with the exec node's MemTracker, so that +/// reservations are included in query memory consumption for the purposes of enforcing +/// memory limits, reporting and logging. The reservation is accounted as consumption +/// against the linked MemTracker and its ancestors because reserved memory is committed. +/// Allocating from a reservation therefore does not change the consumption reflected in +/// the MemTracker hierarchy. +/// +/// MemTracker limits are only checked via the topmost link (i.e. the query-level +/// trackers): we require that no MemTrackers below this level have limits. +/// +/// We require that the MemTracker hierarchy is consistent with the ReservationTracker +/// hierarchy. I.e. if a ReservationTracker is linked to a MemTracker "A", and its parent +/// is linked to a MemTracker "B", then "B" must be the parent of "A"'. +class ReservationTracker { + public: + ReservationTracker(); + virtual ~ReservationTracker(); + + /// Initializes the root tracker with the given reservation limit in bytes. The initial + /// reservation is 0. + /// if 'profile' is not NULL, the counters defined in ReservationTrackerCounters are + /// added to 'profile'. + void InitRootTracker(RuntimeProfile* profile, int64_t reservation_limit); + + /// Initializes a new ReservationTracker with a parent. + /// If 'mem_tracker' is not NULL, reservations for this ReservationTracker and its + /// children will be counted as consumption against 'mem_tracker'. + /// 'reservation_limit' is the maximum reservation for this tracker in bytes. + /// if 'profile' is not NULL, the counters in 'counters_' are added to 'profile'. + void InitChildTracker(RuntimeProfile* profile, ReservationTracker* parent, + MemTracker* mem_tracker, int64_t reservation_limit); + + /// If the tracker is initialized, deregister the ReservationTracker from its parent, + /// relinquishing all this tracker's reservation. All of the reservation must be unused + /// and all the tracker's children must be closed before calling this method. + void Close(); + + /// Request to increase reservation by 'bytes'. The request is either granted in + /// full or not at all. Uses any unused reservation on ancestors and increase + /// ancestors' reservations if needed to fit the increased reservation. + /// Returns true if the reservation increase is granted, or false if not granted. + /// If the reservation is not granted, no modifications are made to the state of + /// any ReservationTrackers. + bool IncreaseReservation(int64_t bytes); + + /// Tries to ensure that 'bytes' of unused reservation is available. If not already + /// available, tries to increase the reservation such that the unused reservation is + /// exactly equal to 'bytes'. Uses any unused reservation on ancestors and increase + /// ancestors' reservations if needed to fit the increased reservation. + /// Returns true if the reservation increase was successful or not necessary. + bool IncreaseReservationToFit(int64_t bytes); + + /// Decrease tracker's reservation by 'bytes'. This tracker's reservation must be at + /// least 'bytes' before calling this method. + /// TODO: decide on and implement policy for how far to release the reservation up + /// the tree. Currently the reservation is released all the way to the root. + void DecreaseReservation(int64_t bytes); + + /// Allocate 'bytes' from the reservation. The tracker must have at least 'bytes' + /// unused reservation before calling this method. + void AllocateFrom(int64_t bytes); + + /// Release 'bytes' of previously allocated memory. The used reservation is + /// decreased by 'bytes'. Before the call, the used reservation must be at least + /// 'bytes' before calling this method. + void ReleaseTo(int64_t bytes); + + /// Returns the amount of the reservation in bytes. + int64_t GetReservation(); + + /// Returns the current amount of the reservation used at this tracker, not including + /// reservations of children in bytes. + int64_t GetUsedReservation(); + + /// Returns the amount of the reservation neither used nor given to childrens' + /// reservations at this tracker in bytes. + int64_t GetUnusedReservation(); + + /// Returns the total reservations of children in bytes. + int64_t GetChildReservations(); + + std::string DebugString(); + + private: + /// Returns the amount of 'reservation_' that is unused. + inline int64_t unused_reservation() const { + return reservation_ - used_reservation_ - child_reservations_; + } + + /// Returns the parent's memtracker if 'parent_' is non-NULL, or NULL otherwise. + MemTracker* GetParentMemTracker() const { + return parent_ == NULL ? NULL : parent_->mem_tracker_; + } + + /// Initializes 'counters_', storing the counters in 'profile'. + /// If 'profile' is NULL, creates a dummy profile to store the counters. + void InitCounters(RuntimeProfile* profile, int64_t max_reservation); + + /// Internal helper for IncreaseReservation(). If 'use_existing_reservation' is true, + /// increase by the minimum amount so that 'bytes' fits in the reservation, otherwise + /// just increase by 'bytes'. If 'is_child_reservation' is true, also increase + /// 'child_reservations_' by 'bytes'. + /// 'lock_' must be held by caller. + bool IncreaseReservationInternalLocked( + int64_t bytes, bool use_existing_reservation, bool is_child_reservation); + + /// Update consumption on linked MemTracker. For the topmost link, return false if + /// this failed because it would exceed a memory limit. If there is no linked + /// MemTracker, just returns true. + /// TODO: remove once we account all memory via ReservationTrackers. + bool TryUpdateMemTracker(int64_t reservation_increase); + + /// Internal helper for DecreaseReservation(). This behaves the same as + /// DecreaseReservation(), except when 'is_child_reservation' is true it decreases + /// 'child_reservations_' by 'bytes'. + void DecreaseReservationInternal(int64_t bytes, bool is_child_reservation); + + /// Same as DecreaseReservationInternal(), but 'lock_' must be held by caller. + void DecreaseReservationInternalLocked(int64_t bytes, bool is_child_reservation); + + /// Check the internal consistency of the ReservationTracker and DCHECKs if in an + /// inconsistent state. + /// 'lock_' must be held by caller. + void CheckConsistency() const; + + /// Increase or decrease 'used_reservation_' and update profile counters accordingly. + /// 'lock_' must be held by caller. + void UpdateUsedReservation(int64_t delta); + + /// Increase or decrease 'reservation_' and update profile counters accordingly. + /// 'lock_' must be held by caller. + void UpdateReservation(int64_t delta); + + /// lock_ protects all members. In a hierarchy of trackers, locks must be acquired + /// from the bottom-up. + SpinLock lock_; + + /// True if the tracker is initialized. + bool initialized_; + + /// A dummy profile to hold the counters in 'counters_' in the case that no profile + /// is provided. + boost::scoped_ptr<DummyProfile> dummy_profile_; + + /// The RuntimeProfile counters for this tracker. + /// All non-NULL if 'initialized_' is true. + ReservationTrackerCounters counters_; + + /// The parent of this tracker in the hierarchy. Does not change after initialization. + ReservationTracker* parent_; + + /// If non-NULL, reservations are counted as memory consumption against this tracker. + /// Does not change after initialization. Not owned. + /// TODO: remove once all memory is accounted via ReservationTrackers. + MemTracker* mem_tracker_; + + /// The maximum reservation in bytes that this tracker can have. + int64_t reservation_limit_; + + /// This tracker's current reservation in bytes. 'reservation_' <= 'reservation_limit_'. + int64_t reservation_; + + /// Total reservation of children in bytes. This is included in 'reservation_'. + /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'. + int64_t child_reservations_; + + /// The amount of the reservation currently used by this tracker in bytes. + /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'. + int64_t used_reservation_; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/runtime/mem-tracker.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc index a9de160..dc855da 100644 --- a/be/src/runtime/mem-tracker.cc +++ b/be/src/runtime/mem-tracker.cc @@ -22,7 +22,7 @@ #include <gperftools/malloc_extension.h> #include <gutil/strings/substitute.h> -#include "bufferpool/reservation-tracker-counters.h" +#include "runtime/bufferpool/reservation-tracker-counters.h" #include "runtime/exec-env.h" #include "runtime/runtime-state.h" #include "util/debug-util.h"
