IMPALA-5113: fix dirty unpinned invariant There were two bugs: * The invariant was too strict and didn't take into account multiple pins of pages (which don't use buffers and therefore shouldn't count). * The invariant wasn't enforced when reclaiming a clean page.
Change the logic so that it's implemented in terms of pages/buffers in various states (this avoids the reservation double-counting and more directly expresses the intent). To aid in this, refactor the page lists to use a wrapper that tracks the # of bytes of pages in each list. Testing: Added a unit test that reproduces the issue and added stricter DCHECKs to detect the issue in future. Change-Id: I07e08acb6cf6839bfccbd09258c093b1c8252b25 Reviewed-on: http://gerrit.cloudera.org:8080/6469 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/874d20d0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/874d20d0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/874d20d0 Branch: refs/heads/master Commit: 874d20d0fabaf0572202a6e65dc55d334229ca6b Parents: e98c88f Author: Tim Armstrong <[email protected]> Authored: Fri Mar 24 08:52:56 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Mon Mar 27 22:52:33 2017 +0000 ---------------------------------------------------------------------- .../runtime/bufferpool/buffer-pool-internal.h | 242 ++++++++++++------- be/src/runtime/bufferpool/buffer-pool-test.cc | 32 +++ be/src/runtime/bufferpool/buffer-pool.cc | 130 +++++----- be/src/runtime/bufferpool/buffer-pool.h | 1 + be/src/util/internal-queue.h | 10 +- 5 files changed, 263 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/874d20d0/be/src/runtime/bufferpool/buffer-pool-internal.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h index 04d76bb..f0dfa09 100644 --- a/be/src/runtime/bufferpool/buffer-pool-internal.h +++ b/be/src/runtime/bufferpool/buffer-pool-internal.h @@ -56,15 +56,14 @@ /// reservations are not overcommitted (they shouldn't be), this global invariant can be /// maintained by enforcing a local invariant for every client: /// -/// unused reservation >= dirty unpinned pages +/// reservation >= BufferHandles returned to client +// + pinned pages + dirty pages (dirty unpinned or write in flight) /// /// The local invariant is maintained by writing pages to disk as the first step of any -/// operation that uses reservation. I.e. the R.H.S. of the invariant must be decreased -/// before the L.H.S. can be decreased. These operations block waiting for enough writes -/// to complete to satisfy the invariant. -/// TODO: this invariant can be broken if a client calls DecreaseReservation() on the -/// ReservationTracker. We should refactor so that DecreaseReservation() goes through -/// the client before closing IMPALA-3202. +/// operation that allocates a new buffer or reclaims buffers from clean pages. I.e. +/// "dirty pages" must be decreased before one of the other values on the R.H.S. of the +/// invariant can be increased. Operations block waiting for enough writes to complete +/// to satisfy the invariant. #ifndef IMPALA_RUNTIME_BUFFER_POOL_INTERNAL_H #define IMPALA_RUNTIME_BUFFER_POOL_INTERNAL_H @@ -79,8 +78,106 @@ #include "runtime/bufferpool/reservation-tracker.h" #include "util/condition-variable.h" +// Ensure that DCheckConsistency() function calls get removed in release builds. +#ifndef NDEBUG +#define DCHECK_CONSISTENCY() DCheckConsistency() +#else +#define DCHECK_CONSISTENCY() +#endif + namespace impala { +/// The internal representation of a page, which can be pinned or unpinned. See the +/// class comment for explanation of the different page states. +struct BufferPool::Page : public InternalList<Page>::Node { + Page(Client* client, int64_t len) : client(client), len(len), pin_count(0) {} + + std::string DebugString(); + + // Helper for BufferPool::DebugString(). + static bool DebugStringCallback(std::stringstream* ss, BufferPool::Page* page); + + /// The client that the page belongs to. + Client* const client; + + /// The length of the page in bytes. + const int64_t len; + + /// The pin count of the page. Only accessed in contexts that are passed the associated + /// PageHandle, so it cannot be accessed by multiple threads concurrently. + int pin_count; + + /// Non-null if there is a write in flight, the page is clean, or the page is evicted. + std::unique_ptr<TmpFileMgr::WriteHandle> write_handle; + + /// Condition variable signalled when a write for this page completes. Protected by + /// client->lock_. + ConditionVariable write_complete_cv_; + + /// This lock must be held when accessing 'buffer' if the page is unpinned and not + /// evicted (i.e. it is safe to access 'buffer' if the page is pinned or evicted). + SpinLock buffer_lock; + + /// Buffer with the page's contents. Closed only iff page is evicted. Open otherwise. + BufferHandle buffer; +}; + +/// Wrapper around InternalList<Page> that tracks the # of bytes in the list. +class BufferPool::PageList { + public: + PageList() : bytes_(0) {} + ~PageList() { + // Clients always empty out their list before destruction. + DCHECK(list_.empty()); + DCHECK_EQ(0, bytes_); + } + + void Enqueue(Page* page) { + list_.Enqueue(page); + bytes_ += page->len; + } + + bool Remove(Page* page) { + if (list_.Remove(page)) { + bytes_ -= page->len; + return true; + } + return false; + } + + Page* Dequeue() { + Page* page = list_.Dequeue(); + if (page != nullptr) { + bytes_ -= page->len; + } + return page; + } + + Page* PopBack() { + Page* page = list_.PopBack(); + if (page != nullptr) { + bytes_ -= page->len; + } + return page; + } + + void Iterate(boost::function<bool(Page*)> fn) { list_.Iterate(fn); } + bool Contains(Page* page) { return list_.Contains(page); } + Page* tail() { return list_.tail(); } + bool empty() const { return list_.empty(); } + int size() const { return list_.size(); } + int64_t bytes() const { return bytes_; } + + void DCheckConsistency() { + DCHECK_GE(bytes_, 0); + DCHECK_EQ(list_.empty(), bytes_ == 0); + } + + private: + InternalList<Page> list_; + int64_t bytes_; +}; + /// The internal state for the client. class BufferPool::Client { public: @@ -90,18 +187,15 @@ class BufferPool::Client { ~Client() { DCHECK_EQ(0, num_pages_); - DCHECK_EQ(0, pinned_pages_.size()); - DCHECK_EQ(0, dirty_unpinned_pages_.size()); - DCHECK_EQ(0, in_flight_write_pages_.size()); + DCHECK_EQ(0, buffers_allocated_bytes_); } /// Release reservation for this client. void Close() { reservation_.Close(); } - /// Add a new pinned page 'page' to the pinned pages list. 'page' must not be in any - /// other lists. Neither the client's lock nor page->buffer_lock should be held by the - /// caller. - void AddNewPinnedPage(Page* page); + /// Create a pinned page using 'buffer', which was allocated using AllocateBuffer(). + /// No client or page locks should be held by the caller. + Page* CreatePinnedPage(BufferHandle&& buffer); /// Reset 'handle', clean up references to handle->page and release any resources /// associated with handle->page. If the page is pinned, 'out_buffer' can be passed in @@ -113,7 +207,7 @@ class BufferPool::Client { /// Updates client state to reflect that 'page' is now a dirty unpinned page. May /// initiate writes for this or other dirty unpinned pages. /// Neither the client's lock nor page->buffer_lock should be held by the caller. - void MoveToDirtyUnpinned(int64_t unused_reservation, Page* page); + void MoveToDirtyUnpinned(Page* page); /// Move an unpinned page to the pinned state, moving between data structures and /// reading from disk if necessary. Returns once the page's buffer is allocated @@ -121,26 +215,24 @@ class BufferPool::Client { /// handle->page_->buffer_lock should be held by the caller. Status MoveToPinned(ClientHandle* client, PageHandle* handle); - /// Must be called before allocating a buffer to ensure that the client can allocate - /// 'allocation_len' bytes without pinned bytes plus dirty unpinned bytes exceeding the - /// client's reservation. No page or client locks should be held by the caller. - Status CleanPagesBeforeAllocation( - ReservationTracker* reservation, int64_t allocation_len); - - /// Same as CleanPagesBeforeAllocation(), except 'lock_' must be held by 'client_lock'. - /// 'client_lock' may be released temporarily while waiting for writes to complete. - Status CleanPagesBeforeAllocationLocked(boost::unique_lock<boost::mutex>* client_lock, - ReservationTracker* reservation, int64_t allocation_len); - - /// Initiates asynchronous writes of dirty unpinned pages to disk. Ensures that at - /// least 'min_bytes_to_write' bytes of writes will be written asynchronously. May - /// start writes more aggressively so that I/O and compute can be overlapped. If - /// any errors are encountered, 'write_status_' is set. 'write_status_' must therefore - /// be checked before reading back any pages. 'lock_' must be held by the caller. - void WriteDirtyPagesAsync(int64_t min_bytes_to_write = 0); + /// Must be called once before allocating a buffer of 'len' via the AllocateBuffer() + /// API to deduct from the client's reservation and update internal accounting. Cleans + /// dirty pages if needed to satisfy the buffer pool's internal invariants. No page or + /// client locks should be held by the caller. + Status PrepareToAllocateBuffer(int64_t len); + + /// Called after a buffer of 'len' is freed via the FreeBuffer() API to update + /// internal accounting and release the buffer to the client's reservation. No page or + /// client locks should be held by the caller. + void FreedBuffer(int64_t len) { + boost::lock_guard<boost::mutex> cl(lock_); + reservation_.ReleaseTo(len); + buffers_allocated_bytes_ -= len; + DCHECK_CONSISTENCY(); + } /// Wait for the in-flight write for 'page' to complete. - /// 'lock_' must be held by the caller via 'client_lock'. page->bufffer_lock should + /// 'lock_' must be held by the caller via 'client_lock'. page->buffer_lock should /// not be held. void WaitForWrite(boost::unique_lock<boost::mutex>* client_lock, Page* page); @@ -158,17 +250,32 @@ class BufferPool::Client { private: // Check consistency of client, DCHECK if inconsistent. 'lock_' must be held. void DCheckConsistency() { - DCHECK_GE(in_flight_write_bytes_, 0); - DCHECK_LE(in_flight_write_bytes_, dirty_unpinned_bytes_); + DCHECK_GE(buffers_allocated_bytes_, 0); + pinned_pages_.DCheckConsistency(); + dirty_unpinned_pages_.DCheckConsistency(); + in_flight_write_pages_.DCheckConsistency(); DCHECK_LE(pinned_pages_.size() + dirty_unpinned_pages_.size() + in_flight_write_pages_.size(), num_pages_); - if (in_flight_write_pages_.empty()) DCHECK_EQ(0, in_flight_write_bytes_); - if (in_flight_write_pages_.empty() && dirty_unpinned_pages_.empty()) { - DCHECK_EQ(0, dirty_unpinned_bytes_); - } + // Check that we flushed enough pages to disk given our eviction policy. + DCHECK_GE(reservation_.GetReservation(), buffers_allocated_bytes_ + + pinned_pages_.bytes() + dirty_unpinned_pages_.bytes() + + in_flight_write_pages_.bytes()); } + /// Must be called once before allocating or reclaiming a buffer of 'len'. Ensures that + /// enough dirty pages are flushed to disk to satisfy the buffer pool's internal + /// invariants after the allocation. 'lock_' should be held by the caller via + /// 'client_lock' + Status CleanPages(boost::unique_lock<boost::mutex>* client_lock, int64_t len); + + /// Initiates asynchronous writes of dirty unpinned pages to disk. Ensures that at + /// least 'min_bytes_to_write' bytes of writes will be written asynchronously. May + /// start writes more aggressively so that I/O and compute can be overlapped. If + /// any errors are encountered, 'write_status_' is set. 'write_status_' must therefore + /// be checked before reading back any pages. 'lock_' must be held by the caller. + void WriteDirtyPagesAsync(int64_t min_bytes_to_write = 0); + /// Called when a write for 'page' completes. void WriteCompleteCallback(Page* page, const Status& write_status); @@ -217,61 +324,20 @@ class BufferPool::Client { /// pages are destroyed before the client. int64_t num_pages_; - /// All pinned pages for this client. Only used for debugging. - InternalList<Page> pinned_pages_; + /// Total bytes of buffers in BufferHandles returned to clients (i.e. obtained from + /// AllocateBuffer() or ExtractBuffer()). + int64_t buffers_allocated_bytes_; + + /// All pinned pages for this client. + PageList pinned_pages_; /// Dirty unpinned pages for this client for which writes are not in flight. Page /// writes are started in LIFO order, because operators typically have sequential access /// patterns where the most recently evicted page will be last to be read. - InternalList<Page> dirty_unpinned_pages_; + PageList dirty_unpinned_pages_; /// Dirty unpinned pages for this client for which writes are in flight. - InternalList<Page> in_flight_write_pages_; - - /// Total bytes of dirty unpinned pages for this client. - int64_t dirty_unpinned_bytes_; - - /// Total bytes of in-flight writes for dirty unpinned pages. Bytes accounted here - /// are also accounted in 'dirty_unpinned_bytes_'. - int64_t in_flight_write_bytes_; -}; - -/// The internal representation of a page, which can be pinned or unpinned. See the -/// class comment for explanation of the different page states. -/// -/// Code manipulating the page is responsible for acquiring 'lock' when reading or -/// modifying the page. -struct BufferPool::Page : public InternalList<Page>::Node { - Page(Client* client, int64_t len) : client(client), len(len), pin_count(0) {} - - std::string DebugString(); - - // Helper for BufferPool::DebugString(). - static bool DebugStringCallback(std::stringstream* ss, BufferPool::Page* page); - - /// The client that the page belongs to. - Client* const client; - - /// The length of the page in bytes. - const int64_t len; - - /// The pin count of the page. Only accessed in contexts that are passed the associated - /// PageHandle, so it cannot be accessed by multiple threads concurrently. - int pin_count; - - /// Non-null if there is a write in flight, the page is clean, or the page is evicted. - std::unique_ptr<TmpFileMgr::WriteHandle> write_handle; - - /// Condition variable signalled when a write for this page completes. Protected by - /// client->lock_. - ConditionVariable write_complete_cv_; - - /// This lock must be held when accessing 'buffer' if the page is unpinned and not - /// evicted (i.e. it is safe to access 'buffer' if the page is pinned or evicted). - SpinLock buffer_lock; - - /// Buffer with the page's contents. Closed only iff page is evicted. Open otherwise. - BufferHandle buffer; + PageList in_flight_write_pages_; }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/874d20d0/be/src/runtime/bufferpool/buffer-pool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc index d4640ac..c71ab60 100644 --- a/be/src/runtime/bufferpool/buffer-pool-test.cc +++ b/be/src/runtime/bufferpool/buffer-pool-test.cc @@ -636,6 +636,38 @@ TEST_F(BufferPoolTest, EvictPageDifferentClient) { pool.FreeBuffer(&clients[1], &buffer); for (BufferPool::ClientHandle& client : clients) pool.DeregisterClient(&client); } + +/// Regression test for IMPALA-5113 where the page flushing invariant didn't correctly +/// take multiply pinned pages into account. +TEST_F(BufferPoolTest, MultiplyPinnedPageAccounting) { + const int NUM_BUFFERS = 3; + const int64_t TOTAL_BYTES = NUM_BUFFERS * TEST_BUFFER_LEN; + global_reservations_.InitRootTracker(NULL, TOTAL_BYTES); + BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES); + + BufferPool::ClientHandle client; + RuntimeProfile* profile = NewProfile(); + ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, + NULL, TOTAL_BYTES, profile, &client)); + ASSERT_TRUE(client.IncreaseReservation(TOTAL_BYTES)); + + BufferPool::PageHandle handle1, handle2; + BufferPool::BufferHandle buffer; + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1)); + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2)); + pool.Unpin(&client, &handle1); + ASSERT_OK(pool.Pin(&client, &handle2)); + ASSERT_OK(pool.AllocateBuffer(&client, TEST_BUFFER_LEN, &buffer)); + + // We shouldn't need to flush anything to disk since we have only three pages/buffers in + // memory. Rely on DCHECKs to check invariants and check we didn't evict the page. + EXPECT_FALSE(IsEvicted(&handle1)) << handle1.DebugString(); + + pool.DestroyPage(&client, &handle1); + pool.DestroyPage(&client, &handle2); + pool.FreeBuffer(&client, &buffer); + pool.DeregisterClient(&client); +} } int main(int argc, char** argv) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/874d20d0/be/src/runtime/bufferpool/buffer-pool.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc index 2503beb..482db10 100644 --- a/be/src/runtime/bufferpool/buffer-pool.cc +++ b/be/src/runtime/bufferpool/buffer-pool.cc @@ -127,12 +127,8 @@ Status BufferPool::CreatePage(ClientHandle* client, int64_t len, PageHandle* han BufferHandle buffer; // No changes have been made to state yet, so we can cleanly return on error. RETURN_IF_ERROR(AllocateBuffer(client, len, &buffer)); - - Page* page = new Page(client->impl_, len); - page->buffer = std::move(buffer); + Page* page = client->impl_->CreatePinnedPage(move(buffer)); handle->Open(page, client); - page->pin_count++; - client->impl_->AddNewPinnedPage(page); return Status::OK(); } @@ -178,7 +174,7 @@ void BufferPool::Unpin(ClientHandle* client, PageHandle* handle) { reservation->ReleaseTo(page->len); if (--page->pin_count > 0) return; - client->impl_->MoveToDirtyUnpinned(reservation->GetUnusedReservation(), page); + client->impl_->MoveToDirtyUnpinned(page); COUNTER_ADD(client->impl_->counters().total_unpinned_bytes, handle->len()); COUNTER_ADD(client->impl_->counters().peak_unpinned_bytes, handle->len()); } @@ -199,10 +195,13 @@ void BufferPool::ExtractBuffer( Status BufferPool::AllocateBuffer( ClientHandle* client, int64_t len, BufferHandle* handle) { - ReservationTracker* reservation = client->impl_->reservation(); - RETURN_IF_ERROR(client->impl_->CleanPagesBeforeAllocation(reservation, len)); - reservation->AllocateFrom(len); - return AllocateBufferInternal(client, len, handle); + RETURN_IF_ERROR(client->impl_->PrepareToAllocateBuffer(len)); + Status status = AllocateBufferInternal(client, len, handle); + if (!status.ok()) { + // Allocation failed - update client's accounting to reflect the failure. + client->impl_->FreedBuffer(len); + } + return status; } Status BufferPool::AllocateBufferInternal( @@ -232,8 +231,9 @@ Status BufferPool::AllocateBufferInternal( void BufferPool::FreeBuffer(ClientHandle* client, BufferHandle* handle) { if (!handle->is_open()) return; // Should be idempotent. DCHECK_EQ(client, handle->client_); - client->impl_->reservation()->ReleaseTo(handle->len_); + int64_t len = handle->len_; FreeBufferInternal(handle); + client->impl_->FreedBuffer(len); } void BufferPool::FreeBufferInternal(BufferHandle* handle) { @@ -281,9 +281,7 @@ void BufferPool::AddCleanPage(const unique_lock<mutex>& client_lock, Page* page) bool BufferPool::RemoveCleanPage(const unique_lock<mutex>& client_lock, Page* page) { page->client->DCheckHoldsLock(client_lock); lock_guard<SpinLock> cpl(clean_pages_lock_); - bool found = clean_pages_.Contains(page); - if (found) clean_pages_.Remove(page); - return found; + return clean_pages_.Remove(page); } Status BufferPool::EvictCleanPages(int64_t bytes_to_evict) { @@ -347,8 +345,7 @@ BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group, file_group_(file_group), name_(name), num_pages_(0), - dirty_unpinned_bytes_(0), - in_flight_write_bytes_(0) { + buffers_allocated_bytes_(0) { reservation_.InitChildTracker( profile, parent_reservation, mem_tracker, reservation_limit); counters_.get_buffer_time = ADD_TIMER(profile, "BufferPoolGetBufferTime"); @@ -364,11 +361,19 @@ BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group, ADD_COUNTER(profile, "BufferPoolTotalUnpinnedBytes", TUnit::BYTES); } -void BufferPool::Client::AddNewPinnedPage(Page* page) { - DCHECK_GT(page->pin_count, 0); +BufferPool::Page* BufferPool::Client::CreatePinnedPage(BufferHandle&& buffer) { + Page* page = new Page(this, buffer.len()); + page->buffer = move(buffer); + page->pin_count = 1; + boost::lock_guard<boost::mutex> lock(lock_); + // The buffer is transferred to the page so will be accounted for in + // pinned_pages_.bytes() instead of buffers_allocated_bytes_. + buffers_allocated_bytes_ -= page->len; pinned_pages_.Enqueue(page); ++num_pages_; + DCHECK_CONSISTENCY(); + return page; } void BufferPool::Client::DestroyPageInternal( @@ -378,12 +383,8 @@ void BufferPool::Client::DestroyPageInternal( // Remove the page from the list that it is currently present in (if any). { unique_lock<mutex> cl(lock_); - if (pinned_pages_.Contains(page)) { - pinned_pages_.Remove(page); - } else if (dirty_unpinned_pages_.Contains(page)) { - dirty_unpinned_pages_.Remove(page); - dirty_unpinned_bytes_ -= page->len; - } else { + // First try to remove from the pinned or dirty unpinned lists. + if (!pinned_pages_.Remove(page) && !dirty_unpinned_pages_.Remove(page)) { // The page either has a write in flight, is clean, or is evicted. // Let the write complete, if in flight. WaitForWrite(&cl, page); @@ -401,6 +402,7 @@ void BufferPool::Client::DestroyPageInternal( if (out_buffer != NULL) { DCHECK(page->buffer.is_open()); *out_buffer = std::move(page->buffer); + buffers_allocated_bytes_ += out_buffer->len(); } else if (page->buffer.is_open()) { pool_->FreeBufferInternal(&page->buffer); } @@ -408,15 +410,15 @@ void BufferPool::Client::DestroyPageInternal( handle->Reset(); } -void BufferPool::Client::MoveToDirtyUnpinned(int64_t unused_reservation, Page* page) { +void BufferPool::Client::MoveToDirtyUnpinned(Page* page) { // Only valid to unpin pages if spilling is enabled. DCHECK(spilling_enabled()); DCHECK_EQ(0, page->pin_count); unique_lock<mutex> lock(lock_); + DCHECK_CONSISTENCY(); DCHECK(pinned_pages_.Contains(page)); pinned_pages_.Remove(page); dirty_unpinned_pages_.Enqueue(page); - dirty_unpinned_bytes_ += page->len; // Check if we should initiate writes for this (or another) dirty page. WriteDirtyPagesAsync(); @@ -425,6 +427,7 @@ void BufferPool::Client::MoveToDirtyUnpinned(int64_t unused_reservation, Page* p Status BufferPool::Client::MoveToPinned(ClientHandle* client, PageHandle* handle) { Page* page = handle->page_; unique_lock<mutex> cl(lock_); + DCHECK_CONSISTENCY(); // Propagate any write errors that occurred for this client. RETURN_IF_ERROR(write_status_); @@ -436,13 +439,15 @@ Status BufferPool::Client::MoveToPinned(ClientHandle* client, PageHandle* handle lock_guard<SpinLock> pl(page->buffer_lock); evicted = !page->buffer.is_open(); } - if (evicted) return MoveEvictedToPinned(&cl, client, handle); + if (evicted) { + // We may need to clean some pages to allocate a buffer for the evicted page. + RETURN_IF_ERROR(CleanPages(&cl, page->len)); + return MoveEvictedToPinned(&cl, client, handle); + } - if (dirty_unpinned_pages_.Contains(page)) { + if (dirty_unpinned_pages_.Remove(page)) { // No writes were initiated for the page - just move it back to the pinned state. - dirty_unpinned_pages_.Remove(page); pinned_pages_.Enqueue(page); - dirty_unpinned_bytes_ -= page->len; return Status::OK(); } if (in_flight_write_pages_.Contains(page)) { @@ -451,6 +456,10 @@ Status BufferPool::Client::MoveToPinned(ClientHandle* client, PageHandle* handle WaitForWrite(&cl, page); RETURN_IF_ERROR(write_status_); // The write may have set 'write_status_'. } + + // At this point we need to either reclaim a clean page or allocate a new buffer. + // We may need to clean some pages to do so. + RETURN_IF_ERROR(CleanPages(&cl, page->len)); if (pool_->RemoveCleanPage(cl, page)) { // The clean page still has an associated buffer. Just clean up the write, restore // the data, and move the page back to the pinned state. @@ -471,8 +480,6 @@ Status BufferPool::Client::MoveEvictedToPinned( unique_lock<mutex>* client_lock, ClientHandle* client, PageHandle* handle) { Page* page = handle->page_; DCHECK(!page->buffer.is_open()); - RETURN_IF_ERROR(CleanPagesBeforeAllocationLocked( - client_lock, client->impl_->reservation(), page->len)); // Don't hold any locks while allocating or reading back the data. It is safe to modify // the page's buffer handle without holding any locks because no concurrent operations @@ -490,35 +497,43 @@ Status BufferPool::Client::MoveEvictedToPinned( file_group_->DestroyWriteHandle(move(page->write_handle)); client_lock->lock(); pinned_pages_.Enqueue(page); + DCHECK_CONSISTENCY(); return Status::OK(); } -Status BufferPool::Client::CleanPagesBeforeAllocation( - ReservationTracker* reservation, int64_t allocation_len) { +Status BufferPool::Client::PrepareToAllocateBuffer(int64_t len) { unique_lock<mutex> lock(lock_); - return CleanPagesBeforeAllocationLocked(&lock, reservation, allocation_len); + // Clean enough pages to allow allocation to proceed without violating our eviction + // policy. This can fail, so only update the accounting once success is ensured. + RETURN_IF_ERROR(CleanPages(&lock, len)); + reservation_.AllocateFrom(len); + buffers_allocated_bytes_ += len; + DCHECK_CONSISTENCY(); + return Status::OK(); } -Status BufferPool::Client::CleanPagesBeforeAllocationLocked( - unique_lock<mutex>* client_lock, ReservationTracker* reservation, - int64_t allocation_len) { +Status BufferPool::Client::CleanPages(unique_lock<mutex>* client_lock, int64_t len) { DCheckHoldsLock(*client_lock); - int64_t unused_reservation = reservation->GetUnusedReservation(); - DCHECK_LE(allocation_len, unused_reservation); - int64_t unused_reservation_after_alloc = unused_reservation - allocation_len; + DCHECK_CONSISTENCY(); + // Work out what we need to get bytes of dirty unpinned + in flight pages down to + // in order to satisfy the eviction policy. + int64_t target_dirty_bytes = reservation_.GetReservation() - buffers_allocated_bytes_ + - pinned_pages_.bytes() - len; // Start enough writes to ensure that the loop condition below will eventually become // false (or a write error will be encountered). - int64_t min_in_flight_bytes = dirty_unpinned_bytes_ - unused_reservation_after_alloc; - WriteDirtyPagesAsync(max<int64_t>(0, min_in_flight_bytes - in_flight_write_bytes_)); + int64_t min_bytes_to_write = + max<int64_t>(0, dirty_unpinned_pages_.bytes() - target_dirty_bytes); + WriteDirtyPagesAsync(min_bytes_to_write); // One of the writes we initiated, or an earlier in-flight write may have hit an error. RETURN_IF_ERROR(write_status_); - // Wait until enough writes have finished that the allocation plus dirty pages won't - // exceed our reservation. I.e. so that other clients can immediately get the allocated + // Wait until enough writes have finished so that we can make the allocation without + // violating the eviction policy. I.e. so that other clients can immediately get the // memory they're entitled to without waiting for this client's write to complete. - DCHECK_GE(in_flight_write_bytes_, min_in_flight_bytes); - while (dirty_unpinned_bytes_ > unused_reservation_after_alloc) { + DCHECK_GE(in_flight_write_pages_.bytes(), min_bytes_to_write); + while (dirty_unpinned_pages_.bytes() + in_flight_write_pages_.bytes() + > target_dirty_bytes) { SCOPED_TIMER(counters().write_wait_time); write_complete_cv_.Wait(*client_lock); RETURN_IF_ERROR(write_status_); // Check if error occurred while waiting. @@ -528,19 +543,17 @@ Status BufferPool::Client::CleanPagesBeforeAllocationLocked( void BufferPool::Client::WriteDirtyPagesAsync(int64_t min_bytes_to_write) { DCHECK_GE(min_bytes_to_write, 0); - DCheckConsistency(); + DCHECK_LE(min_bytes_to_write, dirty_unpinned_pages_.bytes()); if (file_group_ == NULL) { // Spilling disabled - there should be no unpinned pages to write. DCHECK_EQ(0, min_bytes_to_write); - DCHECK_EQ(0, dirty_unpinned_bytes_); + DCHECK_EQ(0, dirty_unpinned_pages_.bytes()); return; } // No point in starting writes if an error occurred because future operations for the // client will fail regardless. if (!write_status_.ok()) return; - const int64_t writeable_bytes = dirty_unpinned_bytes_ - in_flight_write_bytes_; - DCHECK_LE(min_bytes_to_write, writeable_bytes); // Compute the ideal amount of writes to start. We use a simple heuristic based on the // total number of writes. The FileGroup's allocation should spread the writes across // disks somewhat, but doesn't guarantee we're fully using all available disks. In @@ -549,7 +562,7 @@ void BufferPool::Client::WriteDirtyPagesAsync(int64_t min_bytes_to_write) { * file_group_->tmp_file_mgr()->NumActiveTmpDevices(); int64_t bytes_written = 0; - while (bytes_written < writeable_bytes + while (!dirty_unpinned_pages_.empty() && (bytes_written < min_bytes_to_write || in_flight_write_pages_.size() < target_writes)) { Page* page = dirty_unpinned_pages_.tail(); // LIFO. @@ -577,7 +590,6 @@ void BufferPool::Client::WriteDirtyPagesAsync(int64_t min_bytes_to_write) { DCHECK_EQ(tmp, page); in_flight_write_pages_.Enqueue(page); bytes_written += page->len; - in_flight_write_bytes_ += page->len; } } @@ -593,8 +605,6 @@ void BufferPool::Client::WriteCompleteCallback(Page* page, const Status& write_s // repurposed by other clients and 'write_status_' must be checked by this client // before reading back the bad data. pool_->AddCleanPage(cl, page); - dirty_unpinned_bytes_ -= page->len; - in_flight_write_bytes_ -= page->len; WriteDirtyPagesAsync(); // Start another asynchronous write if needed. // Notify before releasing lock to avoid race with Page and Client destruction. @@ -614,10 +624,12 @@ void BufferPool::Client::WaitForWrite(unique_lock<mutex>* client_lock, Page* pag string BufferPool::Client::DebugString() { lock_guard<mutex> lock(lock_); stringstream ss; - ss << Substitute("<BufferPool::Client> $0 name: $1 write_status: $2 num_pages: $3 " - "dirty_unpinned_bytes: $4 in_flight_write_bytes: $5 reservation: {$6}", - this, name_, write_status_.GetDetail(), num_pages_, dirty_unpinned_bytes_, - in_flight_write_bytes_, reservation_.DebugString()); + ss << Substitute("<BufferPool::Client> $0 name: $1 write_status: $2 " + "buffers allocated $3 num_pages: $4 pinned_bytes: $5 " + "dirty_unpinned_bytes: $6 in_flight_write_bytes: $7 reservation: {$8}", + this, name_, write_status_.GetDetail(), buffers_allocated_bytes_, num_pages_, + pinned_pages_.bytes(), dirty_unpinned_pages_.bytes(), + in_flight_write_pages_.bytes(), reservation_.DebugString()); ss << "\n " << pinned_pages_.size() << " pinned pages: "; pinned_pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1)); ss << "\n " << dirty_unpinned_pages_.size() << " dirty unpinned pages: "; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/874d20d0/be/src/runtime/bufferpool/buffer-pool.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h index b199637..da8ef3c 100644 --- a/be/src/runtime/bufferpool/buffer-pool.h +++ b/be/src/runtime/bufferpool/buffer-pool.h @@ -248,6 +248,7 @@ class BufferPool : public CacheLineAligned { private: DISALLOW_COPY_AND_ASSIGN(BufferPool); + class PageList; struct Page; /// Allocate a buffer of length 'len'. Assumes that the client's reservation has already /// been consumed for the buffer. Returns an error if the pool is unable to fulfill the http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/874d20d0/be/src/util/internal-queue.h ---------------------------------------------------------------------- diff --git a/be/src/util/internal-queue.h b/be/src/util/internal-queue.h index 5e32116..b295b1f 100644 --- a/be/src/util/internal-queue.h +++ b/be/src/util/internal-queue.h @@ -154,11 +154,10 @@ class InternalQueueBase { } /// Removes 'node' from the queue. This is O(1). No-op if node is - /// not on the list. - void Remove(T* n) { + /// not on the list. Returns true if removed + bool Remove(T* n) { Node* node = (Node*)n; - if (node->parent_queue == NULL) return; - DCHECK(node->parent_queue == this); + if (node->parent_queue != this) return false; { boost::lock_guard<LockType> lock(lock_); if (node->next == NULL && node->prev == NULL) { @@ -168,7 +167,7 @@ class InternalQueueBase { head_ = tail_ = NULL; --size_; node->parent_queue = NULL; - return; + return true; } if (head_ == node) { @@ -189,6 +188,7 @@ class InternalQueueBase { } node->next = node->prev = NULL; node->parent_queue = NULL; + return true; } /// Clears all elements in the list.
