Repository: incubator-impala Updated Branches: refs/heads/master 060b80fd9 -> cb37be893
IMPALA-5169: Add support for async pins in buffer pool Makes Pin() do async reads behind-the-scenes, instead of blocking until the read completes. The blocking is done instead when the client tries to access the buffer via PageHandle::GetBuffer() or ExtractBuffer(). This is implemented with a new sub-state of "pinned" where the page has a buffer and consumes reservation but the buffer does not contain valid data. Motivation: This unlocks various opportunities to overlap read I/Os with other work: * Reads to different disks can execute in parallel * I/O and computation can be overlapped. This initially benefits BufferedTupleStream::PinStream(), where many pages are pinned at once. With this change the reads run asynchronously. This can potentially lead to large speedups when spilling. E.g. if the pages for a Hash Join's partition are spread across 10 disks, we could get 10x the read throughput, plus overlap the I/O with hash table build. In future we can use this to do read-ahead over unpinned BufferedTupleStreams or for unpinned Runs in Sorter, but this requires changes to the client code to Pin() pages in advance. Testing: * BufferedTupleStreamV2 already exercises this. * Various BufferPool tests already exercise this. * Added a basic test to cover edge cases made possible by the new state transitions. * Extended the randomised test to cover this. Change-Id: Ibdf074c1ac4405d6f08d623ba438a85f7d39fd79 Reviewed-on: http://gerrit.cloudera.org:8080/6612 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cb37be89 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cb37be89 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cb37be89 Branch: refs/heads/master Commit: cb37be8935579263122088e2943d014c4a9aba21 Parents: 060b80f Author: Tim Armstrong <[email protected]> Authored: Tue Apr 11 18:08:39 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu May 11 19:36:09 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/buffered-tuple-stream-v2.cc | 62 ++++++-- be/src/runtime/buffered-tuple-stream-v2.h | 29 +++- .../runtime/bufferpool/buffer-pool-internal.h | 62 +++++--- be/src/runtime/bufferpool/buffer-pool-test.cc | 158 +++++++++++++++---- be/src/runtime/bufferpool/buffer-pool.cc | 108 +++++++++---- be/src/runtime/bufferpool/buffer-pool.h | 55 ++++--- be/src/runtime/tmp-file-mgr.cc | 74 ++++++--- be/src/runtime/tmp-file-mgr.h | 32 +++- 8 files changed, 425 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/buffered-tuple-stream-v2.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-v2.cc b/be/src/runtime/buffered-tuple-stream-v2.cc index 083b59e..c36c31f 100644 --- a/be/src/runtime/buffered-tuple-stream-v2.cc +++ b/be/src/runtime/buffered-tuple-stream-v2.cc @@ -44,6 +44,8 @@ using namespace impala; using namespace strings; +using BufferHandle = BufferPool::BufferHandle; + BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* state, const RowDescriptor& row_desc, BufferPool::ClientHandle* buffer_pool_client, int64_t page_len, const set<SlotId>& ext_varlen_slots) @@ -54,6 +56,7 @@ BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* state, total_byte_size_(0), read_page_rows_returned_(-1), read_ptr_(nullptr), + read_end_ptr_(nullptr), write_ptr_(nullptr), write_end_ptr_(nullptr), rows_returned_(0), @@ -109,15 +112,20 @@ void BufferedTupleStreamV2::CheckConsistency() const { } if (has_write_iterator()) { DCHECK(write_page_->is_pinned()); - DCHECK_GE(write_ptr_, write_page_->data()); - DCHECK_EQ(write_end_ptr_, write_page_->data() + write_page_->len()); + DCHECK(write_page_->retrieved_buffer); + const BufferHandle* write_buffer; + Status status = write_page_->GetBuffer(&write_buffer); + DCHECK(status.ok()); // Write buffer should never have been unpinned. + DCHECK_GE(write_ptr_, write_buffer->data()); + DCHECK_EQ(write_end_ptr_, write_buffer->data() + write_page_->len()); DCHECK_GE(write_end_ptr_, write_ptr_); } if (has_read_iterator()) { DCHECK(read_page_->is_pinned()); - uint8_t* read_end_ptr = read_page_->data() + read_page_->len(); - DCHECK_GE(read_ptr_, read_page_->data()); - DCHECK_GE(read_end_ptr, read_ptr_); + DCHECK(read_page_->retrieved_buffer); + // Can't check read buffer without affecting behaviour, because a read may be in + // flight and this would required blocking on that write. + DCHECK_GE(read_end_ptr_, read_ptr_); } } @@ -186,9 +194,13 @@ Status BufferedTupleStreamV2::PrepareForReadWrite( void BufferedTupleStreamV2::Close(RowBatch* batch, RowBatch::FlushMode flush) { for (Page& page : pages_) { - if (batch != nullptr && page.is_pinned()) { + if (batch != nullptr && page.retrieved_buffer) { + // Subtle: We only need to attach buffers from pages that we may have returned + // references to. ExtractBuffer() cannot fail for these pages because the data + // is guaranteed to already be in -memory. BufferPool::BufferHandle buffer; - buffer_pool_->ExtractBuffer(buffer_pool_client_, &page.handle, &buffer); + Status status = buffer_pool_->ExtractBuffer(buffer_pool_client_, &page.handle, &buffer); + DCHECK(status.ok()); batch->AddBuffer(buffer_pool_client_, move(buffer), flush); } else { buffer_pool_->DestroyPage(buffer_pool_client_, &page.handle); @@ -247,6 +259,7 @@ void BufferedTupleStreamV2::UnpinPageIfNeeded(Page* page, bool stream_pinned) { DCHECK_EQ(new_pin_count, page->pin_count() - 1); buffer_pool_->Unpin(buffer_pool_client_, &page->handle); bytes_pinned_ -= page->len(); + if (page->pin_count() == 0) page->retrieved_buffer = false; } } @@ -255,16 +268,17 @@ Status BufferedTupleStreamV2::NewWritePage() noexcept { DCHECK(!has_write_iterator()); Page new_page; - RETURN_IF_ERROR( - buffer_pool_->CreatePage(buffer_pool_client_, page_len_, &new_page.handle)); + const BufferHandle* write_buffer; + RETURN_IF_ERROR(buffer_pool_->CreatePage( + buffer_pool_client_, page_len_, &new_page.handle, &write_buffer)); bytes_pinned_ += page_len_; total_byte_size_ += page_len_; pages_.push_back(std::move(new_page)); write_page_ = &pages_.back(); DCHECK_EQ(write_page_->num_rows, 0); - write_ptr_ = write_page_->data(); - write_end_ptr_ = write_page_->data() + page_len_; + write_ptr_ = write_buffer->data(); + write_end_ptr_ = write_ptr_ + page_len_; return Status::OK(); } @@ -312,6 +326,8 @@ void BufferedTupleStreamV2::ResetWritePage() { // Unpin the write page if we're reading in unpinned mode. Page* prev_write_page = write_page_; write_page_ = nullptr; + write_ptr_ = nullptr; + write_end_ptr_ = nullptr; // May need to decrement pin count now that it's not the write page, depending on // the stream's mode. @@ -347,8 +363,13 @@ Status BufferedTupleStreamV2::NextReadPage() { // actually fail once we have variable-length pages. RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_)); + // This waits for the pin to complete if the page was unpinned earlier. + const BufferHandle* read_buffer; + RETURN_IF_ERROR(read_page_->GetBuffer(&read_buffer)); + read_page_rows_returned_ = 0; - read_ptr_ = read_page_->data(); + read_ptr_ = read_buffer->data(); + read_end_ptr_ = read_ptr_ + read_buffer->len(); CHECK_CONSISTENCY(); return Status::OK(); @@ -359,6 +380,8 @@ void BufferedTupleStreamV2::ResetReadPage() { // Unpin the write page if we're reading in unpinned mode. Page* prev_read_page = &*read_page_; read_page_ = pages_.end(); + read_ptr_ = nullptr; + read_end_ptr_ = nullptr; // May need to decrement pin count after destroying read iterator. UnpinPageIfNeeded(prev_read_page, pinned_); @@ -384,10 +407,15 @@ Status BufferedTupleStreamV2::PrepareForReadInternal(bool delete_on_read) { read_page_ = pages_.begin(); RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_)); + // This waits for the pin to complete if the page was unpinned earlier. + const BufferHandle* read_buffer; + RETURN_IF_ERROR(read_page_->GetBuffer(&read_buffer)); + DCHECK(has_read_iterator()); DCHECK(read_page_->is_pinned()); read_page_rows_returned_ = 0; - read_ptr_ = read_page_->data(); + read_ptr_ = read_buffer->data(); + read_end_ptr_ = read_ptr_ + read_buffer->len(); rows_returned_ = 0; delete_on_read_ = delete_on_read; CHECK_CONSISTENCY(); @@ -411,6 +439,8 @@ Status BufferedTupleStreamV2::PinStream(bool* pinned) { if (!reservation_granted) return Status::OK(); // At this point success is guaranteed - go through to pin the pages we need to pin. + // If the page data was evicted from memory, the read I/O can happen in parallel + // because we defer calling GetBuffer() until NextReadPage(). for (Page& page : pages_) RETURN_IF_ERROR(PinPageIfNeeded(&page, true)); pinned_ = true; @@ -556,7 +586,7 @@ Status BufferedTupleStreamV2::GetNextInternal( batch->MarkNeedsDeepCopy(); } if (FILL_FLAT_ROWS) DCHECK_EQ(flat_rows->size(), rows_to_fill); - DCHECK_LE(read_ptr_, read_page_->data() + read_page_->len()); + DCHECK_LE(read_ptr_, read_end_ptr_); return Status::OK(); } @@ -567,7 +597,7 @@ void BufferedTupleStreamV2::FixUpStringsForRead( if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset()); - DCHECK_LE(read_ptr_ + sv->len, read_page_->data() + read_page_->len()); + DCHECK_LE(read_ptr_ + sv->len, read_end_ptr_); sv->ptr = reinterpret_cast<char*>(read_ptr_); read_ptr_ += sv->len; } @@ -582,7 +612,7 @@ void BufferedTupleStreamV2::FixUpCollectionsForRead( CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset()); const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor(); int coll_byte_size = cv->num_tuples * item_desc.byte_size(); - DCHECK_LE(read_ptr_ + coll_byte_size, read_page_->data() + read_page_->len()); + DCHECK_LE(read_ptr_ + coll_byte_size, read_end_ptr_); cv->ptr = reinterpret_cast<uint8_t*>(read_ptr_); read_ptr_ += coll_byte_size; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/buffered-tuple-stream-v2.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-v2.h b/be/src/runtime/buffered-tuple-stream-v2.h index d707604..e5fea47 100644 --- a/be/src/runtime/buffered-tuple-stream-v2.h +++ b/be/src/runtime/buffered-tuple-stream-v2.h @@ -152,7 +152,9 @@ class TupleRow; /// or false respectively) before advancing to the next page. /// 2. Pinned: All pages in the stream are pinned so do not need to be pinned or /// unpinned when reading from the stream. If delete on read is true, pages are -/// deleted after being read. +/// deleted after being read. If the stream was previously unpinned, the page's data +/// may not yet be in memory - reading from the stream can block on I/O or fail with +/// an I/O error. /// Write: /// 1. Unpinned: Unpin pages as they fill up. This means that only a enough reservation /// to pin a single write page is required to write to the stream, regardless of the @@ -314,10 +316,9 @@ class BufferedTupleStreamV2 { bool* got_rows) WARN_UNUSED_RESULT; /// Must be called once at the end to cleanup all resources. If 'batch' is non-NULL, - /// attaches buffers from any pinned pages to the batch and deletes unpinned - /// pages. Otherwise deletes all pages. Does nothing if the stream was already - /// closed. The 'flush' mode is forwarded to RowBatch::AddBuffer() when attaching - /// buffers. + /// attaches buffers from pinned pages that rows returned from GetNext() may reference. + /// Otherwise deletes all pages. Does nothing if the stream was already closed. The + /// 'flush' mode is forwarded to RowBatch::AddBuffer() when attaching buffers. void Close(RowBatch* batch, RowBatch::FlushMode flush); /// Number of rows in the stream. @@ -354,18 +355,27 @@ class BufferedTupleStreamV2 { /// Wrapper around BufferPool::PageHandle that tracks additional info about the page. struct Page { - Page() : num_rows(0) {} + Page() : num_rows(0), retrieved_buffer(true) {} inline int len() const { return handle.len(); } - inline uint8_t* data() const { return handle.data(); } inline bool is_pinned() const { return handle.is_pinned(); } inline int pin_count() const { return handle.pin_count(); } + Status GetBuffer(const BufferPool::BufferHandle** buffer) { + RETURN_IF_ERROR(handle.GetBuffer(buffer)); + retrieved_buffer = true; + return Status::OK(); + } std::string DebugString() const; BufferPool::PageHandle handle; /// Number of rows written to the page. int num_rows; + + /// Whether we called GetBuffer() on the page since it was last pinned. This means + /// that GetBuffer() and ExtractBuffer() cannot fail and that GetNext() may have + /// returned rows referencing the page's buffer. + bool retrieved_buffer; }; /// Runtime state instance used to check for cancellation. Not owned. @@ -415,11 +425,14 @@ class BufferedTupleStreamV2 { /// Pointer into read_page_ to the byte after the last row read. uint8_t* read_ptr_; + /// Pointer to one byte past the end of read_page_. Used to detect overruns. + const uint8_t* read_end_ptr_; + /// Pointer into write_page_ to the byte after the last row written. uint8_t* write_ptr_; /// Pointer to one byte past the end of write_page_. Cached to speed up computation - uint8_t* write_end_ptr_; + const uint8_t* write_end_ptr_; /// Number of rows returned to the caller from GetNext() since the last /// PrepareForRead() call. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/bufferpool/buffer-pool-internal.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h index 619288b..0c0408b 100644 --- a/be/src/runtime/bufferpool/buffer-pool-internal.h +++ b/be/src/runtime/bufferpool/buffer-pool-internal.h @@ -37,15 +37,21 @@ /// Each Page object is owned by at most one InternalList<Page> at any given point. /// Each page is either pinned or unpinned. Unpinned has a number of sub-states, which /// is determined by which list in Client/BufferPool contains the page. -/// * Pinned: Always in this state when 'pin_count' > 0. The page is in -/// Client::pinned_pages_. -/// * Unpinned - Dirty: When no write has been started for an unpinned page. The page is -/// in Client::dirty_unpinned_pages_. -/// * Unpinned - Write in flight: When the write has been started but not completed for -/// a dirty unpinned page. The page is in Client::write_in_flight_pages_. For -/// accounting purposes this is considered a dirty page. -/// * Unpinned - Clean: When the write has completed but the page was not evicted. The -/// page is in a clean pages list in a BufferAllocator arena. +/// * Pinned: Always in this state when 'pin_count' > 0. The page has a buffer and is in +/// Client::pinned_pages_. 'pin_in_flight' determines which sub-state the page is in: +/// -> When pin_in_flight=false, the buffer contains the page's data and the client can +/// read and write to the buffer. +/// -> When pin_in_flight=true, the page's data is in the process of being read from +/// scratch disk into the buffer. Clients will block on the read I/O if they attempt +/// to access the buffer. +/// * Unpinned - Dirty: When no write to scratch has been started for an unpinned page. +/// The page is in Client::dirty_unpinned_pages_. +/// * Unpinned - Write in flight: When the write to scratch has been started but not +/// completed for a dirty unpinned page. The page is in +/// Client::write_in_flight_pages_. For accounting purposes this is considered a +/// dirty page. +/// * Unpinned - Clean: When the write to scratch has completed but the page was not +/// evicted. The page is in a clean pages list in a BufferAllocator arena. /// * Unpinned - Evicted: After a clean page's buffer has been reclaimed. The page is /// not in any list. /// @@ -91,7 +97,8 @@ namespace impala { /// The internal representation of a page, which can be pinned or unpinned. See the /// class comment for explanation of the different page states. struct BufferPool::Page : public InternalList<Page>::Node { - Page(Client* client, int64_t len) : client(client), len(len), pin_count(0) {} + Page(Client* client, int64_t len) + : client(client), len(len), pin_count(0), pin_in_flight(false) {} std::string DebugString(); @@ -108,6 +115,11 @@ struct BufferPool::Page : public InternalList<Page>::Node { /// PageHandle, so it cannot be accessed by multiple threads concurrently. int pin_count; + /// True if the read I/O to pin the page was started but not completed. Only accessed + /// in contexts that are passed the associated PageHandle, so it cannot be accessed + /// by multiple threads concurrently. + bool pin_in_flight; + /// Non-null if there is a write in flight, the page is clean, or the page is evicted. std::unique_ptr<TmpFileMgr::WriteHandle> write_handle; @@ -211,10 +223,20 @@ class BufferPool::Client { void MoveToDirtyUnpinned(Page* page); /// Move an unpinned page to the pinned state, moving between data structures and - /// reading from disk if necessary. Returns once the page's buffer is allocated - /// and contains the page's data. Neither the client's lock nor - /// handle->page_->buffer_lock should be held by the caller. - Status MoveToPinned(ClientHandle* client, PageHandle* handle); + /// reading from disk if necessary. Ensures the page has a buffer. If the data is + /// already in memory, ensures the data is in the page's buffer. If the data is on + /// disk, starts an async read of the data and sets 'pin_in_flight' on the page to + /// true. Neither the client's lock nor page->buffer_lock should be held by the caller. + Status StartMoveToPinned(ClientHandle* client, Page* page); + + /// Moves a page that has a pin in flight back to the evicted state, undoing + /// StartMoveToPinned(). Neither the client's lock nor page->buffer_lock should be held + /// by the caller. + void UndoMoveEvictedToPinned(Page* page); + + /// Finish the work of bring the data of an evicted page to memory if + /// page->pin_in_flight was set to true by StartMoveToPinned(). + Status FinishMoveEvictedToPinned(Page* page); /// Must be called once before allocating a buffer of 'len' via the AllocateBuffer() /// API to deduct from the client's reservation and update internal accounting. Cleans @@ -285,12 +307,12 @@ class BufferPool::Client { /// Called when a write for 'page' completes. void WriteCompleteCallback(Page* page, const Status& write_status); - /// Move an evicted page to the pinned state by allocating a new buffer, reading data - /// from disk and moving the page to 'pinned_pages_'. client->impl must be locked by - /// the caller via 'client_lock' and handle->page must be unlocked. 'client_lock' is - /// released then reacquired. - Status MoveEvictedToPinned(boost::unique_lock<boost::mutex>* client_lock, - ClientHandle* client, PageHandle* handle); + /// Move an evicted page to the pinned state by allocating a new buffer, starting an + /// async read from disk and moving the page to 'pinned_pages_'. client->impl must be + /// locked by the caller via 'client_lock' and handle->page must be unlocked. + /// 'client_lock' is released then reacquired. + Status StartMoveEvictedToPinned( + boost::unique_lock<boost::mutex>* client_lock, ClientHandle* client, Page* page); /// The buffer pool that owns the client. BufferPool* const pool_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/bufferpool/buffer-pool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc index 6b9177e..27a2ea9 100644 --- a/be/src/runtime/bufferpool/buffer-pool-test.cc +++ b/be/src/runtime/bufferpool/buffer-pool-test.cc @@ -266,7 +266,7 @@ class BufferPoolTest : public ::testing::Test { void WriteOrVerifyData(const T& object, int val, bool write) { // Only write sentinel values to start and end of buffer to make writing and // verification cheap. - MemRange mem = object.mem_range(); + MemRange mem = GetMemRange(object); uint64_t* start_word = reinterpret_cast<uint64_t*>(mem.data()); uint64_t* end_word = reinterpret_cast<uint64_t*>(&mem.data()[mem.len() - sizeof(uint64_t)]); @@ -279,6 +279,14 @@ class BufferPoolTest : public ::testing::Test { } } + MemRange GetMemRange(const BufferHandle& buffer) { return buffer.mem_range(); } + + MemRange GetMemRange(const PageHandle& page) { + const BufferHandle* buffer; + EXPECT_OK(page.GetBuffer(&buffer)); + return buffer->mem_range(); + } + /// Return the total number of bytes allocated from the system currently. int64_t SystemBytesAllocated(BufferPool* pool) { return pool->allocator()->GetSystemBytesAllocated(); @@ -329,6 +337,11 @@ class BufferPoolTest : public ::testing::Test { LOG(INFO) << "Injected fault by truncating file " << path; } + // Return whether a pin is in flight for the page. + static bool PinInFlight(PageHandle* page) { + return page->page_->pin_in_flight; + } + // Return the path of the temporary file backing the page. static string TmpFilePath(PageHandle* page) { return page->page_->write_handle->TmpFilePath(); @@ -505,11 +518,11 @@ TEST_F(BufferPoolTest, PageCreation) { ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i])); ASSERT_TRUE(handles[i].is_open()); ASSERT_TRUE(handles[i].is_pinned()); - ASSERT_TRUE(handles[i].buffer_handle() != NULL); - ASSERT_TRUE(handles[i].data() != NULL); - ASSERT_EQ(handles[i].buffer_handle()->data(), handles[i].data()); + const BufferHandle* buffer; + ASSERT_OK(handles[i].GetBuffer(&buffer)); + ASSERT_TRUE(buffer->data() != NULL); ASSERT_EQ(handles[i].len(), page_len); - ASSERT_EQ(handles[i].buffer_handle()->len(), page_len); + ASSERT_EQ(buffer->len(), page_len); ASSERT_EQ(client.GetUsedReservation(), used_before + page_len); } @@ -623,14 +636,15 @@ TEST_F(BufferPoolTest, Pin) { BufferPool::PageHandle handle1, handle2; // Can pin two minimum sized pages. - ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1)); + const BufferHandle* page_buffer; + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1, &page_buffer)); ASSERT_TRUE(handle1.is_open()); ASSERT_TRUE(handle1.is_pinned()); - ASSERT_TRUE(handle1.data() != NULL); - ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2)); + ASSERT_TRUE(page_buffer->data() != NULL); + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2, &page_buffer)); ASSERT_TRUE(handle2.is_open()); ASSERT_TRUE(handle2.is_pinned()); - ASSERT_TRUE(handle2.data() != NULL); + ASSERT_TRUE(page_buffer->data() != NULL); pool.Unpin(&client, &handle2); ASSERT_FALSE(handle2.is_pinned()); @@ -646,10 +660,10 @@ TEST_F(BufferPoolTest, Pin) { // Can pin double-sized page only once. BufferPool::PageHandle double_handle; - ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN * 2, &double_handle)); + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN * 2, &double_handle, &page_buffer)); ASSERT_TRUE(double_handle.is_open()); ASSERT_TRUE(double_handle.is_pinned()); - ASSERT_TRUE(double_handle.data() != NULL); + ASSERT_TRUE(page_buffer->data() != NULL); // Destroy the pages - test destroying both pinned and unpinned. pool.DestroyPage(&client, &handle1); @@ -659,6 +673,78 @@ TEST_F(BufferPoolTest, Pin) { pool.DeregisterClient(&client); } +// Test the various state transitions possible with async Pin() calls. +TEST_F(BufferPoolTest, AsyncPin) { + const int DATA_SEED = 1234; + // Set up pool with enough reservation to keep two buffers in memory. + const int64_t TOTAL_MEM = 2 * TEST_BUFFER_LEN; + BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM); + global_reservations_.InitRootTracker(NULL, TOTAL_MEM); + BufferPool::ClientHandle client; + ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, + NULL, TOTAL_MEM, NewProfile(), &client)); + ASSERT_TRUE(client.IncreaseReservationToFit(TOTAL_MEM)); + + PageHandle handle; + const BufferHandle* buffer; + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle, &buffer)); + WriteData(*buffer, DATA_SEED); + // Pin() on a pinned page just increments the pin count. + ASSERT_OK(pool.Pin(&client, &handle)); + EXPECT_EQ(2, handle.pin_count()); + EXPECT_FALSE(PinInFlight(&handle)); + + pool.Unpin(&client, &handle); + pool.Unpin(&client, &handle); + ASSERT_FALSE(handle.is_pinned()); + + // Calling Pin() then Pin() results in double-pinning. + ASSERT_OK(pool.Pin(&client, &handle)); + ASSERT_OK(pool.Pin(&client, &handle)); + EXPECT_EQ(2, handle.pin_count()); + EXPECT_FALSE(PinInFlight(&handle)); + + pool.Unpin(&client, &handle); + pool.Unpin(&client, &handle); + ASSERT_FALSE(handle.is_pinned()); + + // Pin() on a page that isn't evicted pins it immediately. + ASSERT_OK(pool.Pin(&client, &handle)); + EXPECT_EQ(1, handle.pin_count()); + EXPECT_FALSE(PinInFlight(&handle)); + VerifyData(handle, 1234); + pool.Unpin(&client, &handle); + ASSERT_FALSE(handle.is_pinned()); + + // Force eviction. Pin() on an evicted page starts the write asynchronously. + ASSERT_OK(AllocateAndFree(&pool, &client, TOTAL_MEM)); + ASSERT_OK(pool.Pin(&client, &handle)); + EXPECT_EQ(1, handle.pin_count()); + EXPECT_TRUE(PinInFlight(&handle)); + // Block on the pin and verify the buffer. + ASSERT_OK(handle.GetBuffer(&buffer)); + EXPECT_FALSE(PinInFlight(&handle)); + VerifyData(*buffer, 1234); + + // Test that we can unpin while in flight and the data remains valid. + pool.Unpin(&client, &handle); + ASSERT_OK(AllocateAndFree(&pool, &client, TOTAL_MEM)); + ASSERT_OK(pool.Pin(&client, &handle)); + EXPECT_TRUE(PinInFlight(&handle)); + pool.Unpin(&client, &handle); + ASSERT_OK(pool.Pin(&client, &handle)); + ASSERT_OK(handle.GetBuffer(&buffer)); + VerifyData(*buffer, 1234); + + // Evict the page, then destroy while we're pinning it asynchronously. + pool.Unpin(&client, &handle); + ASSERT_OK(AllocateAndFree(&pool, &client, TOTAL_MEM)); + ASSERT_OK(pool.Pin(&client, &handle)); + pool.DestroyPage(&client, &handle); + + pool.DeregisterClient(&client); +} + /// Creating a page or pinning without sufficient reservation should DCHECK. TEST_F(BufferPoolTest, PinWithoutReservation) { int64_t total_mem = TEST_BUFFER_LEN * 1024; @@ -698,9 +784,10 @@ TEST_F(BufferPoolTest, ExtractBuffer) { // Test basic buffer extraction. for (int len = TEST_BUFFER_LEN; len <= 2 * TEST_BUFFER_LEN; len *= 2) { - ASSERT_OK(pool.CreatePage(&client, len, &page)); - uint8_t* page_data = page.data(); - pool.ExtractBuffer(&client, &page, &buffer); + const BufferHandle* page_buffer; + ASSERT_OK(pool.CreatePage(&client, len, &page, &page_buffer)); + uint8_t* page_data = page_buffer->data(); + ASSERT_OK(pool.ExtractBuffer(&client, &page, &buffer)); ASSERT_FALSE(page.is_open()); ASSERT_TRUE(buffer.is_open()); ASSERT_EQ(len, buffer.len()); @@ -711,11 +798,12 @@ TEST_F(BufferPoolTest, ExtractBuffer) { } // Test that ExtractBuffer() accounts correctly for pin count > 1. - ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page)); - uint8_t* page_data = page.data(); + const BufferHandle* page_buffer; + ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page, &page_buffer)); + uint8_t* page_data = page_buffer->data(); ASSERT_OK(pool.Pin(&client, &page)); ASSERT_EQ(TEST_BUFFER_LEN * 2, client.GetUsedReservation()); - pool.ExtractBuffer(&client, &page, &buffer); + ASSERT_OK(pool.ExtractBuffer(&client, &page, &buffer)); ASSERT_EQ(TEST_BUFFER_LEN, client.GetUsedReservation()); ASSERT_FALSE(page.is_open()); ASSERT_TRUE(buffer.is_open()); @@ -727,7 +815,7 @@ TEST_F(BufferPoolTest, ExtractBuffer) { // Test that ExtractBuffer() DCHECKs for unpinned pages. ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page)); pool.Unpin(&client, &page); - IMPALA_ASSERT_DEBUG_DEATH(pool.ExtractBuffer(&client, &page, &buffer), ""); + IMPALA_ASSERT_DEBUG_DEATH((void)pool.ExtractBuffer(&client, &page, &buffer), ""); pool.DestroyPage(&client, &page); pool.DeregisterClient(&client); @@ -871,15 +959,17 @@ TEST_F(BufferPoolTest, EvictPageDifferentClient) { } // Create a pinned and unpinned page for the first client. - BufferPool::PageHandle handle1, handle2; - ASSERT_OK(pool.CreatePage(&clients[0], TEST_BUFFER_LEN, &handle1)); + PageHandle handle1, handle2; + const BufferHandle* page_buffer; + ASSERT_OK(pool.CreatePage(&clients[0], TEST_BUFFER_LEN, &handle1, &page_buffer)); const uint8_t TEST_VAL = 123; - memset(handle1.data(), TEST_VAL, handle1.len()); // Fill page with an arbitrary value. + memset( + page_buffer->data(), TEST_VAL, handle1.len()); // Fill page with an arbitrary value. pool.Unpin(&clients[0], &handle1); ASSERT_OK(pool.CreatePage(&clients[0], TEST_BUFFER_LEN, &handle2)); // Allocating a buffer for the second client requires evicting the unpinned page. - BufferPool::BufferHandle buffer; + BufferHandle buffer; ASSERT_OK(pool.AllocateBuffer(&clients[1], TEST_BUFFER_LEN, &buffer)); ASSERT_TRUE(IsEvicted(&handle1)); @@ -887,7 +977,10 @@ TEST_F(BufferPoolTest, EvictPageDifferentClient) { pool.Unpin(&clients[0], &handle2); ASSERT_OK(pool.Pin(&clients[0], &handle1)); ASSERT_TRUE(IsEvicted(&handle2)); - for (int i = 0; i < handle1.len(); ++i) EXPECT_EQ(TEST_VAL, handle1.data()[i]) << i; + ASSERT_OK(handle1.GetBuffer(&page_buffer)); + for (int i = 0; i < handle1.len(); ++i) { + EXPECT_EQ(TEST_VAL, page_buffer->data()[i]) << i; + } // Clean up everything. pool.DestroyPage(&clients[0], &handle1); @@ -1438,7 +1531,10 @@ TEST_F(BufferPoolTest, ScratchReadError) { DCHECK_EQ(error_type, TRUNCATE); TruncateBackingFile(tmp_file); } - Status status = pool.Pin(&client, &page); + ASSERT_OK(pool.Pin(&client, &page)); + // The read is async, so won't bubble up until we block on it with GetBuffer(). + const BufferHandle* page_buffer; + Status status = page.GetBuffer(&page_buffer); if (error_type == CORRUPT_DATA && !FLAGS_disk_spill_encryption) { // Without encryption we can't detect that the data changed. EXPECT_OK(status); @@ -1662,7 +1758,8 @@ void BufferPoolTest::TestRandomInternalImpl(BufferPool* pool, FileGroup* file_gr if ((i % 10000) == 0) LOG(ERROR) << " Iteration " << i << endl; // Pick an operation. // New page: 15% - // Pin a page: 30% + // Pin a page and block waiting for the result: 20% + // Pin a page and let it continue asynchronously: 10% // Unpin a pinned page: 25% (< Pin prob. so that memory consumption increases). // Destroy page: 10% (< New page prob. so that number of pages grows over time). // Allocate buffer: 10% @@ -1679,24 +1776,29 @@ void BufferPoolTest::TestRandomInternalImpl(BufferPool* pool, FileGroup* file_gr WriteData(new_page, data); pages.emplace_back(move(new_page), data); } else if (p < 0.45) { - // Pin a page. + // Pin a page asynchronously. if (pages.empty()) continue; int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng); PageHandle* page = &pages[rand_pick].first; if (!client.IncreaseReservationToFit(page->len())) continue; if (!page->is_pinned() || multiple_pins) ASSERT_OK(pool->Pin(&client, page)); - VerifyData(*page, pages[rand_pick].second); + // Block on the pin and verify data for sync pins. + if (p < 0.35) VerifyData(*page, pages[rand_pick].second); } else if (p < 0.70) { // Unpin a pinned page. if (pages.empty()) continue; int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng); PageHandle* page = &pages[rand_pick].first; - if (page->is_pinned()) pool->Unpin(&client, page); + if (page->is_pinned()) { + VerifyData(*page, pages[rand_pick].second); + pool->Unpin(&client, page); + } } else if (p < 0.80) { // Destroy a page. if (pages.empty()) continue; int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng); auto page_data = move(pages[rand_pick]); + if (page_data.first.is_pinned()) VerifyData(page_data.first, page_data.second); pages[rand_pick] = move(pages.back()); pages.pop_back(); pool->DestroyPage(&client, &page_data.first); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/bufferpool/buffer-pool.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc index 1d2f1d3..c1bde5f 100644 --- a/be/src/runtime/bufferpool/buffer-pool.cc +++ b/be/src/runtime/bufferpool/buffer-pool.cc @@ -90,11 +90,18 @@ int64_t BufferPool::PageHandle::len() const { return page_->len; // Does not require locking. } -const BufferPool::BufferHandle* BufferPool::PageHandle::buffer_handle() const { +Status BufferPool::PageHandle::GetBuffer(const BufferHandle** buffer) const { + DCHECK(is_open()); + DCHECK(client_->is_registered()); DCHECK(is_pinned()); - // The 'buffer' field cannot change while the page is pinned, so it is safe to access - // without locking. - return &page_->buffer; + if (page_->pin_in_flight) { + // Finish the work started in Pin(). + RETURN_IF_ERROR(client_->impl_->FinishMoveEvictedToPinned(page_)); + } + DCHECK(!page_->pin_in_flight); + *buffer = &page_->buffer; + DCHECK((*buffer)->is_open()); + return Status::OK(); } BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit) @@ -123,16 +130,18 @@ void BufferPool::DeregisterClient(ClientHandle* client) { client->impl_ = NULL; } -Status BufferPool::CreatePage(ClientHandle* client, int64_t len, PageHandle* handle) { +Status BufferPool::CreatePage( + ClientHandle* client, int64_t len, PageHandle* handle, const BufferHandle** buffer) { DCHECK(!handle->is_open()); DCHECK_GE(len, min_buffer_len_); DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len)); - BufferHandle buffer; + BufferHandle new_buffer; // No changes have been made to state yet, so we can cleanly return on error. - RETURN_IF_ERROR(AllocateBuffer(client, len, &buffer)); - Page* page = client->impl_->CreatePinnedPage(move(buffer)); + RETURN_IF_ERROR(AllocateBuffer(client, len, &new_buffer)); + Page* page = client->impl_->CreatePinnedPage(move(new_buffer)); handle->Open(page, client); + if (buffer != nullptr) *buffer = &page->buffer; return Status::OK(); } @@ -140,10 +149,16 @@ void BufferPool::DestroyPage(ClientHandle* client, PageHandle* handle) { if (!handle->is_open()) return; // DestroyPage() should be idempotent. if (handle->is_pinned()) { + // Cancel the read I/O - we don't need the data any more. + if (handle->page_->pin_in_flight) { + handle->page_->write_handle->CancelRead(); + handle->page_->pin_in_flight = false; + } // In the pinned case, delegate to ExtractBuffer() and FreeBuffer() to do the work // of cleaning up the page, freeing the buffer and updating reservations correctly. BufferHandle buffer; - ExtractBuffer(client, handle, &buffer); + Status status = ExtractBuffer(client, handle, &buffer); + DCHECK(status.ok()) << status.msg().msg(); FreeBuffer(client, &buffer); } else { // In the unpinned case, no reservations are used so we just clean up the page. @@ -158,7 +173,7 @@ Status BufferPool::Pin(ClientHandle* client, PageHandle* handle) { Page* page = handle->page_; if (page->pin_count == 0) { - RETURN_IF_ERROR(client->impl_->MoveToPinned(client, handle)); + RETURN_IF_ERROR(client->impl_->StartMoveToPinned(client, page)); COUNTER_ADD(client->impl_->counters().peak_unpinned_bytes, -page->len); } // Update accounting last to avoid complicating the error return path above. @@ -178,23 +193,34 @@ void BufferPool::Unpin(ClientHandle* client, PageHandle* handle) { reservation->ReleaseTo(page->len); if (--page->pin_count > 0) return; - client->impl_->MoveToDirtyUnpinned(page); + if (page->pin_in_flight) { + // Data is not in memory - move it back to evicted. + client->impl_->UndoMoveEvictedToPinned(page); + } else { + // Data is in memory - move it to dirty unpinned. + client->impl_->MoveToDirtyUnpinned(page); + } COUNTER_ADD(client->impl_->counters().total_unpinned_bytes, handle->len()); COUNTER_ADD(client->impl_->counters().peak_unpinned_bytes, handle->len()); } -void BufferPool::ExtractBuffer( +Status BufferPool::ExtractBuffer( ClientHandle* client, PageHandle* page_handle, BufferHandle* buffer_handle) { DCHECK(page_handle->is_pinned()); DCHECK(!buffer_handle->is_open()); DCHECK_EQ(page_handle->client_, client); + // If an async pin is in flight, we need to wait for it. + const BufferHandle* dummy; + RETURN_IF_ERROR(page_handle->GetBuffer(&dummy)); + // Bring the pin count to 1 so that we're not using surplus reservations. while (page_handle->pin_count() > 1) Unpin(client, page_handle); // Destroy the page and extract the buffer. client->impl_->DestroyPageInternal(page_handle, buffer_handle); DCHECK(buffer_handle->is_open()); + return Status::OK(); } Status BufferPool::AllocateBuffer( @@ -347,6 +373,7 @@ void BufferPool::Client::MoveToDirtyUnpinned(Page* page) { // Only valid to unpin pages if spilling is enabled. DCHECK(spilling_enabled()); DCHECK_EQ(0, page->pin_count); + unique_lock<mutex> lock(lock_); DCHECK_CONSISTENCY(); DCHECK(pinned_pages_.Contains(page)); @@ -357,8 +384,7 @@ void BufferPool::Client::MoveToDirtyUnpinned(Page* page) { WriteDirtyPagesAsync(); } -Status BufferPool::Client::MoveToPinned(ClientHandle* client, PageHandle* handle) { - Page* page = handle->page_; +Status BufferPool::Client::StartMoveToPinned(ClientHandle* client, Page* page) { unique_lock<mutex> cl(lock_); DCHECK_CONSISTENCY(); // Propagate any write errors that occurred for this client. @@ -390,31 +416,55 @@ Status BufferPool::Client::MoveToPinned(ClientHandle* client, PageHandle* handle return file_group_->RestoreData(move(page->write_handle), page->buffer.mem_range()); } // If the page wasn't in the clean pages list, it must have been evicted. - return MoveEvictedToPinned(&cl, client, handle); + return StartMoveEvictedToPinned(&cl, client, page); } -Status BufferPool::Client::MoveEvictedToPinned( - unique_lock<mutex>* client_lock, ClientHandle* client, PageHandle* handle) { - Page* page = handle->page_; +Status BufferPool::Client::StartMoveEvictedToPinned( + unique_lock<mutex>* client_lock, ClientHandle* client, Page* page) { DCHECK(!page->buffer.is_open()); - // Don't hold any locks while allocating or reading back the data. It is safe to modify - // the page's buffer handle without holding any locks because no concurrent operations - // can modify evicted pages. - client_lock->unlock(); + // Safe to modify the page's buffer handle without holding the page lock because no + // concurrent operations can modify evicted pages. BufferHandle buffer; RETURN_IF_ERROR(pool_->allocator_->Allocate(client, page->len, &page->buffer)); COUNTER_ADD(counters().bytes_read, page->len); COUNTER_ADD(counters().read_io_ops, 1); - { - SCOPED_TIMER(counters().read_wait_time); - RETURN_IF_ERROR( - file_group_->Read(page->write_handle.get(), page->buffer.mem_range())); - } - file_group_->DestroyWriteHandle(move(page->write_handle)); - client_lock->lock(); + RETURN_IF_ERROR( + file_group_->ReadAsync(page->write_handle.get(), page->buffer.mem_range())); pinned_pages_.Enqueue(page); + page->pin_in_flight = true; + DCHECK_CONSISTENCY(); + return Status::OK(); +} + +void BufferPool::Client::UndoMoveEvictedToPinned(Page* page) { + // We need to get the page back to the evicted state where: + // * There is no in-flight read. + // * The page's data is on disk referenced by 'write_handle' + // * The page has no attached buffer. + DCHECK(page->pin_in_flight); + page->write_handle->CancelRead(); + page->pin_in_flight = false; + + unique_lock<mutex> lock(lock_); DCHECK_CONSISTENCY(); + DCHECK(pinned_pages_.Contains(page)); + pinned_pages_.Remove(page); + // Discard the buffer - the pin was in flight so there was no way that a valid + // reference to the buffer's contents was returned since the pin was still in flight. + pool_->allocator_->Free(move(page->buffer)); +} + +Status BufferPool::Client::FinishMoveEvictedToPinned(Page* page) { + DCHECK(page->pin_in_flight); + SCOPED_TIMER(counters().read_wait_time); + // Don't hold any locks while reading back the data. It is safe to modify the page's + // buffer handle without holding any locks because no concurrent operations can modify + // evicted pages. + RETURN_IF_ERROR( + file_group_->WaitForAsyncRead(page->write_handle.get(), page->buffer.mem_range())); + file_group_->DestroyWriteHandle(move(page->write_handle)); + page->pin_in_flight = false; return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/bufferpool/buffer-pool.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h index 9a37923..dbf75bc 100644 --- a/be/src/runtime/bufferpool/buffer-pool.h +++ b/be/src/runtime/bufferpool/buffer-pool.h @@ -133,7 +133,9 @@ class SystemAllocator; /// * Once the operator needs the page's contents again and has sufficient unused /// reservation, it can call Pin(), which brings the page's contents back into memory, /// perhaps in a different buffer. Therefore the operator must fix up any pointers into -/// the previous buffer. +/// the previous buffer. Pin() can execute asynchronously - the caller only blocks +/// waiting for read I/O if it calls GetBuffer() or ExtractBuffer() while the read is +/// in flight. /// * If the operator is done with the page, it can call FreeBuffer() to destroy the /// handle and release resources, or call ExtractBuffer() to extract the buffer. /// @@ -184,15 +186,19 @@ class BufferPool : public CacheLineAligned { /// sufficient unused reservation to pin the new page (otherwise it will DCHECK). /// CreatePage() only fails when a system error prevents the buffer pool from fulfilling /// the reservation. - /// On success, the handle is mapped to the new page. - Status CreatePage( - ClientHandle* client, int64_t len, PageHandle* handle) WARN_UNUSED_RESULT; + /// On success, the handle is mapped to the new page and 'buffer', if non-NULL, is set + /// to the page's buffer. + Status CreatePage(ClientHandle* client, int64_t len, PageHandle* handle, + const BufferHandle** buffer = nullptr) WARN_UNUSED_RESULT; /// Increment the pin count of 'handle'. After Pin() the underlying page will - /// be mapped to a buffer, which will be accessible through 'handle'. Uses - /// reservation from 'client'. The caller is responsible for ensuring it has enough - /// unused reservation before calling Pin() (otherwise it will DCHECK). Pin() only - /// fails when a system error prevents the buffer pool from fulfilling the reservation. + /// be mapped to a buffer, which will be accessible through 'handle'. If the data + /// was evicted from memory, it will be read back into memory asynchronously. + /// Attempting to access the buffer with ExtractBuffer() or handle.GetBuffer() will + /// block until the data is in memory. The caller is responsible for ensuring it has + /// enough unused reservation before calling Pin() (otherwise it will DCHECK). Pin() + /// only fails when a system error prevents the buffer pool from fulfilling the + /// reservation or if an I/O error is encountered reading back data from disk. /// 'handle' must be open. Status Pin(ClientHandle* client, PageHandle* handle) WARN_UNUSED_RESULT; @@ -214,9 +220,11 @@ class BufferPool : public CacheLineAligned { /// Extracts buffer from a pinned page. After this returns, the page referenced by /// 'page_handle' will be destroyed and 'buffer_handle' will reference the buffer from /// 'page_handle'. This may decrease reservation usage of 'client' if the page was - /// pinned multiple times via 'page_handle'. - void ExtractBuffer( - ClientHandle* client, PageHandle* page_handle, BufferHandle* buffer_handle); + /// pinned multiple times via 'page_handle'. May return an error if 'page_handle' was + /// unpinned earlier with no subsequent GetBuffer() call and a read error is + /// encountered while bringing the page back into memory. + Status ExtractBuffer( + ClientHandle* client, PageHandle* page_handle, BufferHandle* buffer_handle) WARN_UNUSED_RESULT; /// Allocates a new buffer of 'len' bytes. Uses reservation from 'client'. The caller /// is responsible for ensuring it has enough unused reservation before calling @@ -400,19 +408,15 @@ class BufferPool::PageHandle { bool is_pinned() const { return pin_count() > 0; } int pin_count() const; int64_t len() const; - /// Get a pointer to the start of the page's buffer. Only valid to call if the page - /// is pinned. - uint8_t* data() const { return buffer_handle()->data(); } - /// Convenience function to get the memory range for the page's buffer. Only valid to - /// call if the page is pinned. - MemRange mem_range() const { return buffer_handle()->mem_range(); } - - /// Return a pointer to the page's buffer handle. Only valid to call if the page is - /// pinned via this handle. Only const accessors of the returned handle can be used: - /// it is invalid to call FreeBuffer() or TransferBuffer() on it or to otherwise modify - /// the handle. - const BufferHandle* buffer_handle() const; + /// Get a reference to the page's buffer handle. Only valid to call if the page is + /// pinned. If the page was previously unpinned and the read I/O for the data is still + /// in flight, this can block waiting. Returns an error if an error was encountered + /// reading the data back, which can only happen if Unpin() was called on the page + /// since the last call to GetBuffer(). Only const accessors of the returned handle can + /// be used: it is invalid to call FreeBuffer() or TransferBuffer() on it or to + /// otherwise modify the handle. + Status GetBuffer(const BufferHandle** buffer_handle) const WARN_UNUSED_RESULT; std::string DebugString() const; @@ -431,9 +435,8 @@ class BufferPool::PageHandle { /// The internal page structure. NULL if the handle is not open. Page* page_; - /// The client the page handle belongs to, used to validate that the correct client - /// is being used. - const ClientHandle* client_; + /// The client the page handle belongs to. + ClientHandle* client_; }; inline BufferPool::BufferHandle::BufferHandle(BufferHandle&& src) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/tmp-file-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index 293a898..bc09f2e 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -368,36 +368,44 @@ Status TmpFileMgr::FileGroup::Write( } Status TmpFileMgr::FileGroup::Read(WriteHandle* handle, MemRange buffer) { + RETURN_IF_ERROR(ReadAsync(handle, buffer)); + return WaitForAsyncRead(handle, buffer); +} + +Status TmpFileMgr::FileGroup::ReadAsync(WriteHandle* handle, MemRange buffer) { DCHECK(handle->write_range_ != nullptr); DCHECK(!handle->is_cancelled_); DCHECK_EQ(buffer.len(), handle->len()); Status status; - // Don't grab 'lock_' in this method - it is not necessary because we don't touch - // any members that it protects and could block other threads for the duration of - // the synchronous read. + // Don't grab 'write_state_lock_' in this method - it is not necessary because we + // don't touch any members that it protects and could block other threads for the + // duration of the synchronous read. DCHECK(!handle->write_in_flight_); + DCHECK(handle->read_range_ == nullptr); DCHECK(handle->write_range_ != nullptr); - // Don't grab handle->lock_, it is safe to touch all of handle's state since the - // write is not in flight. - DiskIoMgr::ScanRange* scan_range = scan_range_pool_.Add(new DiskIoMgr::ScanRange); - scan_range->Reset(nullptr, handle->write_range_->file(), handle->write_range_->len(), - handle->write_range_->offset(), handle->write_range_->disk_id(), false, + // Don't grab handle->write_state_lock_, it is safe to touch all of handle's state + // since the write is not in flight. + handle->read_range_ = scan_range_pool_.Add(new DiskIoMgr::ScanRange); + handle->read_range_->Reset(nullptr, handle->write_range_->file(), + handle->write_range_->len(), handle->write_range_->offset(), + handle->write_range_->disk_id(), false, DiskIoMgr::BufferOpts::ReadInto(buffer.data(), buffer.len())); - DiskIoMgr::BufferDescriptor* io_mgr_buffer = nullptr; - { - SCOPED_TIMER(disk_read_timer_); - read_counter_->Add(1); - bytes_read_counter_->Add(buffer.len()); - status = io_mgr_->Read(io_ctx_, scan_range, &io_mgr_buffer); - if (!status.ok()) goto exit; - } - - if (FLAGS_disk_spill_encryption) { - status = handle->CheckHashAndDecrypt(buffer); - if (!status.ok()) goto exit; - } + read_counter_->Add(1); + bytes_read_counter_->Add(buffer.len()); + RETURN_IF_ERROR(io_mgr_->AddScanRange(io_ctx_, handle->read_range_, true)); + return Status::OK(); +} +Status TmpFileMgr::FileGroup::WaitForAsyncRead(WriteHandle* handle, MemRange buffer) { + DCHECK(handle->read_range_ != nullptr); + // Don't grab handle->write_state_lock_, it is safe to touch all of handle's state + // since the write is not in flight. + SCOPED_TIMER(disk_read_timer_); + DiskIoMgr::BufferDescriptor* io_mgr_buffer = nullptr; + Status status = handle->read_range_->GetNext(&io_mgr_buffer); + if (!status.ok()) goto exit; + DCHECK(io_mgr_buffer != NULL); DCHECK(io_mgr_buffer->eosr()); DCHECK_LE(io_mgr_buffer->len(), buffer.len()); if (io_mgr_buffer->len() < buffer.len()) { @@ -409,9 +417,14 @@ Status TmpFileMgr::FileGroup::Read(WriteHandle* handle, MemRange buffer) { } DCHECK_EQ(io_mgr_buffer->buffer(), buffer.data()); + if (FLAGS_disk_spill_encryption) { + status = handle->CheckHashAndDecrypt(buffer); + if (!status.ok()) goto exit; + } exit: // Always return the buffer before exiting to avoid leaking it. if (io_mgr_buffer != nullptr) io_mgr_buffer->Return(); + handle->read_range_ = nullptr; return status; } @@ -420,6 +433,7 @@ Status TmpFileMgr::FileGroup::RestoreData( DCHECK_EQ(handle->write_range_->data(), buffer.data()); DCHECK_EQ(handle->len(), buffer.len()); DCHECK(!handle->write_in_flight_); + DCHECK(handle->read_range_ == nullptr); // Decrypt after the write is finished, so that we don't accidentally write decrypted // data to disk. Status status; @@ -501,6 +515,7 @@ TmpFileMgr::WriteHandle::WriteHandle( : cb_(cb), encryption_timer_(encryption_timer), file_(nullptr), + read_range_(nullptr), is_cancelled_(false), write_in_flight_(false) {} @@ -570,9 +585,20 @@ void TmpFileMgr::WriteHandle::WriteComplete(const Status& write_status) { } void TmpFileMgr::WriteHandle::Cancel() { - unique_lock<mutex> lock(write_state_lock_); - is_cancelled_ = true; - // TODO: in future, if DiskIoMgr supported cancellation, we could cancel it here. + CancelRead(); + { + unique_lock<mutex> lock(write_state_lock_); + is_cancelled_ = true; + // TODO: in future, if DiskIoMgr supported write cancellation, we could cancel it + // here. + } +} + +void TmpFileMgr::WriteHandle::CancelRead() { + if (read_range_ != nullptr) { + read_range_->Cancel(Status::CANCELLED); + read_range_ = nullptr; + } } void TmpFileMgr::WriteHandle::WaitForWrite() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/tmp-file-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h index d66c527..c71c370 100644 --- a/be/src/runtime/tmp-file-mgr.h +++ b/be/src/runtime/tmp-file-mgr.h @@ -123,9 +123,22 @@ class TmpFileMgr { /// Synchronously read the data referenced by 'handle' from the temporary file into /// 'buffer'. buffer.len() must be the same as handle->len(). Can only be called - /// after a write successfully completes. + /// after a write successfully completes. Should not be called while an async read + /// is in flight. Equivalent to calling ReadAsync() then WaitForAsyncRead(). Status Read(WriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT; + /// Asynchronously read the data referenced by 'handle' from the temporary file into + /// 'buffer'. buffer.len() must be the same as handle->len(). Can only be called + /// after a write successfully completes. WaitForAsyncRead() must be called before the + /// data in the buffer is valid. Should not be called while an async read + /// is already in flight. + Status ReadAsync(WriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT; + + /// Wait until the read started for 'handle' by ReadAsync() completes. 'buffer' + /// should be the same buffer passed into ReadAsync(). Returns an error if the + /// read fails. Retrying a failed read by calling ReadAsync() again is allowed. + Status WaitForAsyncRead(WriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT; + /// Restore the original data in the 'buffer' passed to Write(), decrypting or /// decompressing as necessary. Returns an error if restoring the data fails. /// The write must not be in-flight - the caller is responsible for waiting for @@ -215,7 +228,7 @@ class TmpFileMgr { /// Amount of scratch space allocated in bytes. RuntimeProfile::Counter* const scratch_space_bytes_used_counter_; - /// Time taken for disk reads. + /// Time spent waiting for disk reads. RuntimeProfile::Counter* const disk_read_timer_; /// Time spent in disk spill encryption, decryption, and integrity checking. @@ -263,14 +276,21 @@ class TmpFileMgr { public: /// The write must be destroyed by passing it to FileGroup - destroying it before /// the write completes is an error. - ~WriteHandle() { DCHECK(!write_in_flight_); } + ~WriteHandle() { + DCHECK(!write_in_flight_); + DCHECK(read_range_ == nullptr); + } - /// Cancels the write asynchronously. After Cancel() is called, writes are not + /// Cancels any in-flight writes or reads. Reads are cancelled synchronously and + /// writes are cancelled asynchronously. After Cancel() is called, writes are not /// retried. The write callback may be called with a CANCELLED status (unless /// it succeeded or encountered a different error first). /// TODO: IMPALA-3200: make this private once BufferedBlockMgr doesn't need it. void Cancel(); + /// Cancel any in-flight read synchronously. + void CancelRead(); + /// Blocks until the write completes either successfully or unsuccessfully. /// May return before the write callback has been called. /// TODO: IMPALA-3200: make this private once BufferedBlockMgr doesn't need it. @@ -333,6 +353,10 @@ class TmpFileMgr { /// on writes; verified on reads. This is calculated _after_ encryption. IntegrityHash hash_; + /// The scan range for the read that is currently in flight. NULL when no read is in + /// flight. + DiskIoMgr::ScanRange* read_range_; + /// Protects all fields below while 'write_in_flight_' is true. At other times, it is /// invalid to call WriteRange/FileGroup methods concurrently from multiple threads, /// so no locking is required. This is a terminal lock and should not be held while
