IMPALA-5676: avoid expensive consistency checks in BTSv2 Doing an O(n) consistency check every time the read or write page was advanced results in O(n^2) overall runtime.
The fix is to separate the O(1) and O(n) checks and only do the O(n) checks if: * The function does an an O(n) pass over the pages anyway (e.g. PinStream()) * The function is called only once per read or write pass over the stream. This should make the cost of the checks O(n) (if we make the reasonable assumption that PrepareForWrite(), PrepareForRead(), PinStream() and UnpinStream() are called a bounded number of times per stream). Testing: Ran BufferedTupleStreamV2Test. Change-Id: I8b380fcd0568cb73b36a490954bcd316db969ede Reviewed-on: http://gerrit.cloudera.org:8080/7459 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9d1e4449 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9d1e4449 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9d1e4449 Branch: refs/heads/master Commit: 9d1e4449cdf3f83dda6d87ebbb7bb4a6c854ce0b Parents: 9f2f065 Author: Tim Armstrong <[email protected]> Authored: Tue Jul 18 16:19:42 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Jul 20 22:53:25 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/buffered-tuple-stream-v2.cc | 64 +++++++++++++++---------- be/src/runtime/buffered-tuple-stream-v2.h | 9 +++- 2 files changed, 45 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9d1e4449/be/src/runtime/buffered-tuple-stream-v2.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-v2.cc b/be/src/runtime/buffered-tuple-stream-v2.cc index faa9008..2153264 100644 --- a/be/src/runtime/buffered-tuple-stream-v2.cc +++ b/be/src/runtime/buffered-tuple-stream-v2.cc @@ -36,9 +36,11 @@ #include "common/names.h" #ifdef NDEBUG -#define CHECK_CONSISTENCY() +#define CHECK_CONSISTENCY_FAST() +#define CHECK_CONSISTENCY_FULL() #else -#define CHECK_CONSISTENCY() CheckConsistency() +#define CHECK_CONSISTENCY_FAST() CheckConsistencyFast() +#define CHECK_CONSISTENCY_FULL() CheckConsistencyFull() #endif using namespace impala; @@ -112,18 +114,19 @@ BufferedTupleStreamV2::~BufferedTupleStreamV2() { DCHECK(closed_); } -void BufferedTupleStreamV2::CheckConsistency() const { +void BufferedTupleStreamV2::CheckConsistencyFull() const { + CheckConsistencyFast(); + // The below checks require iterating over all the pages in the stream. DCHECK_EQ(bytes_pinned_, CalcBytesPinned()) << DebugString(); DCHECK_EQ(pages_.size(), num_pages_) << DebugString(); - for (const Page& page : pages_) { - DCHECK_EQ(ExpectedPinCount(pinned_, &page), page.pin_count()) << DebugString(); - // Only one large row per page. - if (page.len() > default_page_len_) DCHECK_LE(page.num_rows, 1); - // We only create pages when we have a row to append to them. - DCHECK_GT(page.num_rows, 0); - } + for (const Page& page : pages_) CheckPageConsistency(&page); +} + +void BufferedTupleStreamV2::CheckConsistencyFast() const { + // All the below checks should be O(1). DCHECK(has_write_iterator() || write_page_ == nullptr); if (write_page_ != nullptr) { + CheckPageConsistency(write_page_); DCHECK(write_page_->is_pinned()); DCHECK(write_page_->retrieved_buffer); const BufferHandle* write_buffer; @@ -135,6 +138,7 @@ void BufferedTupleStreamV2::CheckConsistency() const { } DCHECK(has_read_iterator() || read_page_ == pages_.end()); if (read_page_ != pages_.end()) { + CheckPageConsistency(&*read_page_); DCHECK(read_page_->is_pinned()); DCHECK(read_page_->retrieved_buffer); // Can't check read buffer without affecting behaviour, because a read may be in @@ -154,6 +158,14 @@ void BufferedTupleStreamV2::CheckConsistency() const { } } +void BufferedTupleStreamV2::CheckPageConsistency(const Page* page) const { + DCHECK_EQ(ExpectedPinCount(pinned_, page), page->pin_count()) << DebugString(); + // Only one large row per page. + if (page->len() > default_page_len_) DCHECK_LE(page->num_rows, 1); + // We only create pages when we have a row to append to them. + DCHECK_GT(page->num_rows, 0); +} + string BufferedTupleStreamV2::DebugString() const { stringstream ss; ss << "BufferedTupleStreamV2 num_rows=" << num_rows_ @@ -205,14 +217,14 @@ Status BufferedTupleStreamV2::PrepareForWrite(bool* got_reservation) { DCHECK(!delete_on_read_); DCHECK(!has_write_iterator()); DCHECK(!has_read_iterator()); - CHECK_CONSISTENCY(); + CHECK_CONSISTENCY_FULL(); *got_reservation = buffer_pool_client_->IncreaseReservationToFit(default_page_len_); if (!*got_reservation) return Status::OK(); has_write_iterator_ = true; // Save reservation for the write iterators. buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_); - CHECK_CONSISTENCY(); + CHECK_CONSISTENCY_FULL(); return Status::OK(); } @@ -223,7 +235,7 @@ Status BufferedTupleStreamV2::PrepareForReadWrite( DCHECK(!delete_on_read_); DCHECK(!has_write_iterator()); DCHECK(!has_read_iterator()); - CHECK_CONSISTENCY(); + CHECK_CONSISTENCY_FULL(); *got_reservation = buffer_pool_client_->IncreaseReservationToFit(2 * default_page_len_); if (!*got_reservation) return Status::OK(); @@ -380,7 +392,7 @@ Status BufferedTupleStreamV2::CalcPageLenForRow(int64_t row_size, int64_t* page_ Status BufferedTupleStreamV2::AdvanceWritePage( int64_t row_size, bool* got_reservation) noexcept { DCHECK(has_write_iterator()); - CHECK_CONSISTENCY(); + CHECK_CONSISTENCY_FAST(); int64_t page_len; RETURN_IF_ERROR(CalcPageLenForRow(row_size, &page_len)); @@ -412,7 +424,7 @@ Status BufferedTupleStreamV2::AdvanceWritePage( - write_page_reservation_to_reclaim)) { DCHECK(pinned_ || page_len > default_page_len_) << "If the stream is unpinned, this should only fail for large pages"; - CHECK_CONSISTENCY(); + CHECK_CONSISTENCY_FAST(); *got_reservation = false; return Status::OK(); } @@ -461,7 +473,7 @@ void BufferedTupleStreamV2::InvalidateWriteIterator() { Status BufferedTupleStreamV2::NextReadPage() { DCHECK(has_read_iterator()); DCHECK(!closed_); - CHECK_CONSISTENCY(); + CHECK_CONSISTENCY_FAST(); if (read_page_ == pages_.end()) { // No rows read yet - start reading at first page. If the stream is unpinned, we can @@ -488,7 +500,7 @@ Status BufferedTupleStreamV2::NextReadPage() { } if (read_page_ == pages_.end()) { - CHECK_CONSISTENCY(); + CHECK_CONSISTENCY_FULL(); return Status::OK(); } @@ -526,7 +538,7 @@ Status BufferedTupleStreamV2::NextReadPage() { write_page_ != nullptr, has_read_write_page())) { buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_); } - CHECK_CONSISTENCY(); + CHECK_CONSISTENCY_FAST(); return Status::OK(); } @@ -551,7 +563,7 @@ void BufferedTupleStreamV2::InvalidateReadIterator() { } Status BufferedTupleStreamV2::PrepareForRead(bool delete_on_read, bool* got_reservation) { - CHECK_CONSISTENCY(); + CHECK_CONSISTENCY_FULL(); InvalidateWriteIterator(); InvalidateReadIterator(); // If already pinned, no additional pin is needed (see ExpectedPinCount()). @@ -588,13 +600,13 @@ Status BufferedTupleStreamV2::PrepareForReadInternal(bool delete_on_read) { read_page_rows_returned_ = 0; rows_returned_ = 0; delete_on_read_ = delete_on_read; - CHECK_CONSISTENCY(); + CHECK_CONSISTENCY_FULL(); return Status::OK(); } Status BufferedTupleStreamV2::PinStream(bool* pinned) { DCHECK(!closed_); - CHECK_CONSISTENCY(); + CHECK_CONSISTENCY_FULL(); if (pinned_) { *pinned = true; return Status::OK(); @@ -634,12 +646,12 @@ Status BufferedTupleStreamV2::PinStream(bool* pinned) { pinned_ = true; *pinned = true; - CHECK_CONSISTENCY(); + CHECK_CONSISTENCY_FULL(); return Status::OK(); } void BufferedTupleStreamV2::UnpinStream(UnpinMode mode) { - CHECK_CONSISTENCY(); + CHECK_CONSISTENCY_FULL(); DCHECK(!closed_); if (mode == UNPIN_ALL) { // Invalidate the iterators so they don't keep pages pinned. @@ -648,7 +660,7 @@ void BufferedTupleStreamV2::UnpinStream(UnpinMode mode) { } if (pinned_) { - CHECK_CONSISTENCY(); + CHECK_CONSISTENCY_FULL(); // If the stream was pinned, there may be some remaining pinned pages that should // be unpinned at this point. for (Page& page : pages_) UnpinPageIfNeeded(&page, false); @@ -662,7 +674,7 @@ void BufferedTupleStreamV2::UnpinStream(UnpinMode mode) { } pinned_ = false; } - CHECK_CONSISTENCY(); + CHECK_CONSISTENCY_FULL(); } Status BufferedTupleStreamV2::GetRows( @@ -905,7 +917,7 @@ void BufferedTupleStreamV2::AddLargeRowCustomEnd(int64_t size) noexcept { buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_); } // The stream should be in a consistent state once the row is added. - CHECK_CONSISTENCY(); + CHECK_CONSISTENCY_FAST(); } bool BufferedTupleStreamV2::AddRow(TupleRow* row, Status* status) noexcept { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9d1e4449/be/src/runtime/buffered-tuple-stream-v2.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-v2.h b/be/src/runtime/buffered-tuple-stream-v2.h index 26f2113..2023124 100644 --- a/be/src/runtime/buffered-tuple-stream-v2.h +++ b/be/src/runtime/buffered-tuple-stream-v2.h @@ -692,8 +692,13 @@ class BufferedTupleStreamV2 { int64_t CalcBytesPinned() const; /// DCHECKs if the stream is internally inconsistent. The stream should always be in - /// a consistent state after returning success from a public API call. - void CheckConsistency() const; + /// a consistent state after returning success from a public API call. The Fast version + /// has constant runtime and does not check all of 'pages_'. The Full version includes + /// O(n) checks that require iterating over the whole 'pages_' list (e.g. checking that + /// each page is in a valid state). + void CheckConsistencyFast() const; + void CheckConsistencyFull() const; + void CheckPageConsistency(const Page* page) const; }; }
