IMPALA-5676: avoid expensive consistency checks in BTSv2

Doing an O(n) consistency check every time the read or write
page was advanced results in O(n^2) overall runtime.

The fix is to separate the O(1) and O(n) checks and only do the
O(n) checks if:
* The function does an an O(n) pass over the pages anyway (e.g.
  PinStream())
* The function is called only once per read or write pass over the
  stream.

This should make the cost of the checks O(n) (if we make the reasonable
assumption that PrepareForWrite(), PrepareForRead(), PinStream() and
UnpinStream() are called a bounded number of times per stream).

Testing:
Ran BufferedTupleStreamV2Test.

Change-Id: I8b380fcd0568cb73b36a490954bcd316db969ede
Reviewed-on: http://gerrit.cloudera.org:8080/7459
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9d1e4449
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9d1e4449
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9d1e4449

Branch: refs/heads/master
Commit: 9d1e4449cdf3f83dda6d87ebbb7bb4a6c854ce0b
Parents: 9f2f065
Author: Tim Armstrong <[email protected]>
Authored: Tue Jul 18 16:19:42 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Jul 20 22:53:25 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/buffered-tuple-stream-v2.cc | 64 +++++++++++++++----------
 be/src/runtime/buffered-tuple-stream-v2.h  |  9 +++-
 2 files changed, 45 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9d1e4449/be/src/runtime/buffered-tuple-stream-v2.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.cc 
b/be/src/runtime/buffered-tuple-stream-v2.cc
index faa9008..2153264 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.cc
+++ b/be/src/runtime/buffered-tuple-stream-v2.cc
@@ -36,9 +36,11 @@
 #include "common/names.h"
 
 #ifdef NDEBUG
-#define CHECK_CONSISTENCY()
+#define CHECK_CONSISTENCY_FAST()
+#define CHECK_CONSISTENCY_FULL()
 #else
-#define CHECK_CONSISTENCY() CheckConsistency()
+#define CHECK_CONSISTENCY_FAST() CheckConsistencyFast()
+#define CHECK_CONSISTENCY_FULL() CheckConsistencyFull()
 #endif
 
 using namespace impala;
@@ -112,18 +114,19 @@ BufferedTupleStreamV2::~BufferedTupleStreamV2() {
   DCHECK(closed_);
 }
 
-void BufferedTupleStreamV2::CheckConsistency() const {
+void BufferedTupleStreamV2::CheckConsistencyFull() const {
+  CheckConsistencyFast();
+  // The below checks require iterating over all the pages in the stream.
   DCHECK_EQ(bytes_pinned_, CalcBytesPinned()) << DebugString();
   DCHECK_EQ(pages_.size(), num_pages_) << DebugString();
-  for (const Page& page : pages_) {
-    DCHECK_EQ(ExpectedPinCount(pinned_, &page), page.pin_count()) << 
DebugString();
-    // Only one large row per page.
-    if (page.len() > default_page_len_) DCHECK_LE(page.num_rows, 1);
-    // We only create pages when we have a row to append to them.
-    DCHECK_GT(page.num_rows, 0);
-  }
+  for (const Page& page : pages_) CheckPageConsistency(&page);
+}
+
+void BufferedTupleStreamV2::CheckConsistencyFast() const {
+  // All the below checks should be O(1).
   DCHECK(has_write_iterator() || write_page_ == nullptr);
   if (write_page_ != nullptr) {
+    CheckPageConsistency(write_page_);
     DCHECK(write_page_->is_pinned());
     DCHECK(write_page_->retrieved_buffer);
     const BufferHandle* write_buffer;
@@ -135,6 +138,7 @@ void BufferedTupleStreamV2::CheckConsistency() const {
   }
   DCHECK(has_read_iterator() || read_page_ == pages_.end());
   if (read_page_ != pages_.end()) {
+    CheckPageConsistency(&*read_page_);
     DCHECK(read_page_->is_pinned());
     DCHECK(read_page_->retrieved_buffer);
     // Can't check read buffer without affecting behaviour, because a read may 
be in
@@ -154,6 +158,14 @@ void BufferedTupleStreamV2::CheckConsistency() const {
   }
 }
 
+void BufferedTupleStreamV2::CheckPageConsistency(const Page* page) const {
+  DCHECK_EQ(ExpectedPinCount(pinned_, page), page->pin_count()) << 
DebugString();
+  // Only one large row per page.
+  if (page->len() > default_page_len_) DCHECK_LE(page->num_rows, 1);
+  // We only create pages when we have a row to append to them.
+  DCHECK_GT(page->num_rows, 0);
+}
+
 string BufferedTupleStreamV2::DebugString() const {
   stringstream ss;
   ss << "BufferedTupleStreamV2 num_rows=" << num_rows_
@@ -205,14 +217,14 @@ Status BufferedTupleStreamV2::PrepareForWrite(bool* 
got_reservation) {
   DCHECK(!delete_on_read_);
   DCHECK(!has_write_iterator());
   DCHECK(!has_read_iterator());
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FULL();
 
   *got_reservation = 
buffer_pool_client_->IncreaseReservationToFit(default_page_len_);
   if (!*got_reservation) return Status::OK();
   has_write_iterator_ = true;
   // Save reservation for the write iterators.
   buffer_pool_client_->SaveReservation(&write_page_reservation_, 
default_page_len_);
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FULL();
   return Status::OK();
 }
 
@@ -223,7 +235,7 @@ Status BufferedTupleStreamV2::PrepareForReadWrite(
   DCHECK(!delete_on_read_);
   DCHECK(!has_write_iterator());
   DCHECK(!has_read_iterator());
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FULL();
 
   *got_reservation = buffer_pool_client_->IncreaseReservationToFit(2 * 
default_page_len_);
   if (!*got_reservation) return Status::OK();
@@ -380,7 +392,7 @@ Status BufferedTupleStreamV2::CalcPageLenForRow(int64_t 
row_size, int64_t* page_
 Status BufferedTupleStreamV2::AdvanceWritePage(
     int64_t row_size, bool* got_reservation) noexcept {
   DCHECK(has_write_iterator());
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FAST();
 
   int64_t page_len;
   RETURN_IF_ERROR(CalcPageLenForRow(row_size, &page_len));
@@ -412,7 +424,7 @@ Status BufferedTupleStreamV2::AdvanceWritePage(
           - write_page_reservation_to_reclaim)) {
     DCHECK(pinned_ || page_len > default_page_len_)
         << "If the stream is unpinned, this should only fail for large pages";
-    CHECK_CONSISTENCY();
+    CHECK_CONSISTENCY_FAST();
     *got_reservation = false;
     return Status::OK();
   }
@@ -461,7 +473,7 @@ void BufferedTupleStreamV2::InvalidateWriteIterator() {
 Status BufferedTupleStreamV2::NextReadPage() {
   DCHECK(has_read_iterator());
   DCHECK(!closed_);
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FAST();
 
   if (read_page_ == pages_.end()) {
     // No rows read yet - start reading at first page. If the stream is 
unpinned, we can
@@ -488,7 +500,7 @@ Status BufferedTupleStreamV2::NextReadPage() {
   }
 
   if (read_page_ == pages_.end()) {
-    CHECK_CONSISTENCY();
+    CHECK_CONSISTENCY_FULL();
     return Status::OK();
   }
 
@@ -526,7 +538,7 @@ Status BufferedTupleStreamV2::NextReadPage() {
              write_page_ != nullptr, has_read_write_page())) {
     buffer_pool_client_->SaveReservation(&write_page_reservation_, 
default_page_len_);
   }
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FAST();
   return Status::OK();
 }
 
@@ -551,7 +563,7 @@ void BufferedTupleStreamV2::InvalidateReadIterator() {
 }
 
 Status BufferedTupleStreamV2::PrepareForRead(bool delete_on_read, bool* 
got_reservation) {
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FULL();
   InvalidateWriteIterator();
   InvalidateReadIterator();
   // If already pinned, no additional pin is needed (see ExpectedPinCount()).
@@ -588,13 +600,13 @@ Status BufferedTupleStreamV2::PrepareForReadInternal(bool 
delete_on_read) {
   read_page_rows_returned_ = 0;
   rows_returned_ = 0;
   delete_on_read_ = delete_on_read;
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FULL();
   return Status::OK();
 }
 
 Status BufferedTupleStreamV2::PinStream(bool* pinned) {
   DCHECK(!closed_);
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FULL();
   if (pinned_) {
     *pinned = true;
     return Status::OK();
@@ -634,12 +646,12 @@ Status BufferedTupleStreamV2::PinStream(bool* pinned) {
 
   pinned_ = true;
   *pinned = true;
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FULL();
   return Status::OK();
 }
 
 void BufferedTupleStreamV2::UnpinStream(UnpinMode mode) {
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FULL();
   DCHECK(!closed_);
   if (mode == UNPIN_ALL) {
     // Invalidate the iterators so they don't keep pages pinned.
@@ -648,7 +660,7 @@ void BufferedTupleStreamV2::UnpinStream(UnpinMode mode) {
   }
 
   if (pinned_) {
-    CHECK_CONSISTENCY();
+    CHECK_CONSISTENCY_FULL();
     // If the stream was pinned, there may be some remaining pinned pages that 
should
     // be unpinned at this point.
     for (Page& page : pages_) UnpinPageIfNeeded(&page, false);
@@ -662,7 +674,7 @@ void BufferedTupleStreamV2::UnpinStream(UnpinMode mode) {
     }
     pinned_ = false;
   }
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FULL();
 }
 
 Status BufferedTupleStreamV2::GetRows(
@@ -905,7 +917,7 @@ void BufferedTupleStreamV2::AddLargeRowCustomEnd(int64_t 
size) noexcept {
     buffer_pool_client_->SaveReservation(&write_page_reservation_, 
default_page_len_);
   }
   // The stream should be in a consistent state once the row is added.
-  CHECK_CONSISTENCY();
+  CHECK_CONSISTENCY_FAST();
 }
 
 bool BufferedTupleStreamV2::AddRow(TupleRow* row, Status* status) noexcept {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9d1e4449/be/src/runtime/buffered-tuple-stream-v2.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.h 
b/be/src/runtime/buffered-tuple-stream-v2.h
index 26f2113..2023124 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.h
+++ b/be/src/runtime/buffered-tuple-stream-v2.h
@@ -692,8 +692,13 @@ class BufferedTupleStreamV2 {
   int64_t CalcBytesPinned() const;
 
   /// DCHECKs if the stream is internally inconsistent. The stream should 
always be in
-  /// a consistent state after returning success from a public API call.
-  void CheckConsistency() const;
+  /// a consistent state after returning success from a public API call. The 
Fast version
+  /// has constant runtime and does not check all of 'pages_'. The Full 
version includes
+  /// O(n) checks that require iterating over the whole 'pages_' list (e.g. 
checking that
+  /// each page is in a valid state).
+  void CheckConsistencyFast() const;
+  void CheckConsistencyFull() const;
+  void CheckPageConsistency(const Page* page) const;
 };
 }
 

Reply via email to