This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.0.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 80a1dc33d33538af609a0f1dd761363a06d279b8
Author: Riza Suminto <[email protected]>
AuthorDate: Thu Sep 16 15:12:22 2021 -0700

    IMPALA-10714: Defer advancing read page until the buffer is attached
    
    If a BufferedTupleStream is a read-write stream and set up with
    attach_on_read = true, BufferedTupleStream::NextReadPage() expects that
    the page buffer is attached to the output row batch before advancing the
    read iterator. However, BufferedTupleStream::GetNextInternal() will not
    attach the page buffer of a fully read page if it is a read-write page.
    
    Consider the following scenario:
    1. Only 1 page left in stream. This is a read-write page.
    2. GetNext() has fully read the page, but does NOT attach the buffer to
       output row batch because it is a read-write page.
    3. Stream writer insert more rows, but the read-write page can not fit
       any more rows. Therefore, new pages are created.
    4. Stream writer call UnpinStream().
    5. UnpinStream() call NextReadPage(), which in turn will fail the
       assertion "read_iter->read_page_->attached_to_output_batch".
    
    BufferedTupleStream::UnpinStream() need to defer advancing the read page
    if this situation happens.
    
    This patch adds BE test StreamStateTest.UnpinFullyExhaustedReadPage that
    simulates the corner case. This patch also moves BE test
    DeferAdvancingReadPage and ShortDebugString into class StreamStateTest
    to reduce friend class declaration in buffered-tuple-stream.h
    
    Testing:
    - Run and pass BE test StreamStateTest.UnpinFullyExhaustedReadPage.
    
    Change-Id: I586ed72ba01cc3f28b0dcb1e202b3ca32a6c3b83
    Reviewed-on: http://gerrit.cloudera.org:8080/17853
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/buffered-tuple-stream-test.cc | 229 ++++++++++++++++++++++++++-
 be/src/runtime/buffered-tuple-stream.cc      |  30 ++--
 be/src/runtime/buffered-tuple-stream.h       |   4 +-
 3 files changed, 244 insertions(+), 19 deletions(-)

diff --git a/be/src/runtime/buffered-tuple-stream-test.cc 
b/be/src/runtime/buffered-tuple-stream-test.cc
index dfdfeba..ad34f5e 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -569,6 +569,38 @@ class ArrayTupleStreamTest : public SimpleTupleStreamTest {
   }
 };
 
+/// Test internal stream state under certain corner cases.
+class StreamStateTest : public SimpleTupleStreamTest {
+ protected:
+  // Test that UnpinStream defers advancing the read page when all rows from 
the read
+  // page are attached to a returned RowBatch but got not enough reservation.
+  void TestDeferAdvancingReadPage();
+
+  // Test unpinning a read-write stream when the read page has been fully 
exhausted but
+  // its buffer is not attached yet to the output row batch.
+  void TestUnpinAfterFullStreamRead(
+      bool read_write, bool attach_on_read, bool refill_before_unpin);
+
+  // Fill up the stream by repeatedly inserting write_batch into the stream 
until it is
+  // full. Return number of rows successfully inserted into the stream.
+  // Stream must be in pinned mode.
+  Status FillUpStream(
+      BufferedTupleStream* stream, RowBatch* write_batch, int64_t& 
num_inserted);
+
+  // Read out the stream until eos is reached. Return number of rows 
successfully read.
+  Status ReadOutStream(
+      BufferedTupleStream* stream, RowBatch* read_batch, int64_t& num_read);
+
+  // Verify that page count, pinned bytes, and unpinned bytes of the stream 
match the
+  // expectation.
+  void VerifyStreamState(BufferedTupleStream* stream, int num_page, int 
num_pinned_page,
+      int num_unpinned_page, int buffer_size);
+
+  // Test that stream's debug string is capped only for the first
+  // BufferedTupleStream::MAX_PAGE_ITER_DEBUG.
+  void TestShortDebugString();
+};
+
 // Basic API test. No data should be going to disk.
 TEST_F(SimpleTupleStreamTest, Basic) {
   Init(numeric_limits<int64_t>::max());
@@ -1297,9 +1329,7 @@ TEST_F(SimpleTupleStreamTest, UnpinReadPage) {
   write_batch->Reset();
 }
 
-// Test that UnpinStream defer advancing the read page when all rows from the 
read page
-// are attached to a returned RowBatch but got not enough reservation.
-TEST_F(SimpleTupleStreamTest, DeferAdvancingReadPage) {
+void StreamStateTest::TestDeferAdvancingReadPage() {
   int num_rows = 1024;
   int buffer_size = 4 * 1024;
   // Only give 2 * buffer_size for the stream initial read and write page 
reservation.
@@ -1315,7 +1345,7 @@ TEST_F(SimpleTupleStreamTest, DeferAdvancingReadPage) {
     // and the output batch has NOT been reset.
     BufferedTupleStream stream(
         runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
-    ASSERT_OK(stream.Init("SimpleTupleStreamTest::DeferAdvancingReadPage", 
true));
+    ASSERT_OK(stream.Init("StreamStateTest::DeferAdvancingReadPage", true));
     ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
     ASSERT_TRUE(got_reservation);
 
@@ -1367,6 +1397,126 @@ TEST_F(SimpleTupleStreamTest, DeferAdvancingReadPage) {
   write_batch->Reset();
 }
 
+void StreamStateTest::TestUnpinAfterFullStreamRead(
+    bool read_write, bool attach_on_read, bool refill_before_unpin) {
+  DCHECK(read_write || !refill_before_unpin)
+      << "Only read-write stream support refilling stream after full read.";
+
+  int num_rows = 1024;
+  int buffer_size = 4 * 1024;
+  int max_num_pages = 4;
+  Init(max_num_pages * buffer_size);
+
+  bool got_reservation;
+  RowBatch* write_batch = CreateIntBatch(0, num_rows, false);
+
+  {
+    BufferedTupleStream stream(
+        runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
+    ASSERT_OK(stream.Init("StreamStateTest::TestUnpinAfterFullStreamRead", 
true));
+    if (read_write) {
+      ASSERT_OK(stream.PrepareForReadWrite(attach_on_read, &got_reservation));
+    } else {
+      ASSERT_OK(stream.PrepareForWrite(&got_reservation));
+    }
+    ASSERT_TRUE(got_reservation);
+    RowBatch read_batch(int_desc_, num_rows, &tracker_);
+    int64_t num_rows_written = 0;
+    int64_t num_rows_read = 0;
+
+    // Add rows into the stream until the stream is full.
+    ASSERT_OK(FillUpStream(&stream, write_batch, num_rows_written));
+    int num_pages = max_num_pages;
+    ASSERT_EQ(stream.pages_.size(), num_pages);
+    ASSERT_FALSE(stream.has_read_write_page());
+
+    // Read the entire rows out of the stream.
+    if (!read_write) {
+      ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_reservation));
+      ASSERT_TRUE(got_reservation);
+    }
+    ASSERT_OK(ReadOutStream(&stream, &read_batch, num_rows_read));
+    if (attach_on_read) num_pages = 1;
+    ASSERT_EQ(stream.pages_.size(), num_pages);
+    ASSERT_EQ(stream.has_read_write_page(), read_write);
+
+    if (read_write && refill_before_unpin) {
+      // Fill the stream until it is full again.
+      ASSERT_OK(FillUpStream(&stream, write_batch, num_rows_written));
+      num_pages = max_num_pages;
+      ASSERT_EQ(stream.pages_.size(), num_pages);
+      ASSERT_EQ(stream.has_read_write_page(), !attach_on_read);
+    }
+
+    // Verify that the read page has been fully read before unpinning the 
stream.
+    ASSERT_EQ(
+        stream.read_it_.read_page_rows_returned_, 
stream.read_it_.read_page_->num_rows);
+    // read_page_ should NOT be attached to output batch unless stream is in 
read-only and
+    // attach_on_read mode.
+    bool attached = !read_write && attach_on_read;
+    ASSERT_EQ(stream.read_it_.read_page_->attached_to_output_batch, attached);
+
+    // Verify stream state before UnpinStream.
+    int num_pinned_pages = num_pages;
+    ASSERT_TRUE(stream.is_pinned());
+    if (attached) {
+      // In a pinned + read-only + attach_on_read stream, a fully exhausted 
read page is
+      // automatically unpinned and destroyed, but not yet removed from 
stream.pages_
+      // until the next GetNext() or UnpinStream() call.
+      ASSERT_EQ(stream.pages_.size(), 1);
+      num_pinned_pages = 0;
+    }
+    VerifyStreamState(&stream, num_pages, num_pinned_pages, 0, buffer_size);
+
+    // Unpin the stream.
+    
ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+    ASSERT_FALSE(stream.is_pinned());
+
+    // Verify stream state after UnpinStream. num_pages should remain 
unchanged after
+    // UnpinStream() except for the case of read-only + attach_on_read stream.
+    if (read_write) {
+      if (attach_on_read) {
+        if (refill_before_unpin) {
+          num_pinned_pages = 2;
+          ASSERT_TRUE(stream.pages_.begin()->is_pinned());
+          ASSERT_TRUE(stream.pages_.back().is_pinned());
+        } else {
+          num_pinned_pages = 1;
+          ASSERT_TRUE(stream.pages_.back().is_pinned());
+        }
+      } else {
+        num_pinned_pages = 1;
+        ASSERT_TRUE(stream.pages_.back().is_pinned());
+      }
+    } else {
+      if (attach_on_read) {
+        num_pages = 0;
+      }
+      num_pinned_pages = 0;
+    }
+    int num_unpinned_pages = num_pages - num_pinned_pages;
+    VerifyStreamState(
+        &stream, num_pages, num_pinned_pages, num_unpinned_pages, buffer_size);
+
+    if (read_write) {
+      // Additionally, test that write and read operation still work in 
read-write
+      // stream after UnpinStream.
+      Status status;
+      ASSERT_OK(ReadOutStream(&stream, &read_batch, num_rows_read));
+      for (int i = 0; i < write_batch->num_rows(); ++i) {
+        EXPECT_TRUE(stream.AddRow(write_batch->GetRow(i), &status));
+        ASSERT_OK(status);
+      }
+      ASSERT_OK(ReadOutStream(&stream, &read_batch, num_rows_read));
+      ASSERT_EQ(write_batch->num_rows(), num_rows_read);
+    }
+
+    stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+    read_batch.Reset();
+  }
+  write_batch->Reset();
+}
+
 // Test writing to a stream (AddRow and UnpinStream), even though attached 
pages have not
 // been released yet.
 TEST_F(SimpleTupleStreamTest, WriteAfterReadAttached) {
@@ -1527,7 +1677,7 @@ TEST_F(SimpleTupleStreamTest, ConcurrentReaders) {
   stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
 
-TEST_F(SimpleTupleStreamTest, ShortDebugString) {
+void StreamStateTest::TestShortDebugString() {
   Init(BUFFER_POOL_LIMIT);
 
   int num_batches = 50;
@@ -1539,7 +1689,7 @@ TEST_F(SimpleTupleStreamTest, ShortDebugString) {
 
   BufferedTupleStream stream(
       runtime_state_, desc, &client_, default_page_len, max_page_len);
-  ASSERT_OK(stream.Init("SimpleTupleStreamTest::ShortDebugString", true));
+  ASSERT_OK(stream.Init("StreamStateTest::ShortDebugString", true));
   bool got_write_reservation;
   ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
   ASSERT_TRUE(got_write_reservation);
@@ -2022,6 +2172,73 @@ TEST_F(ArrayTupleStreamTest, TestComputeRowSize) {
 
   stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
+
+Status StreamStateTest::FillUpStream(
+    BufferedTupleStream* stream, RowBatch* write_batch, int64_t& num_inserted) 
{
+  DCHECK(stream->is_pinned());
+  int64_t idx = 0;
+  Status status;
+  num_inserted = 0;
+  while (stream->AddRow(write_batch->GetRow(idx), &status)) {
+    RETURN_IF_ERROR(status);
+    idx = (idx + 1) % write_batch->num_rows();
+    num_inserted++;
+  }
+  return status;
+}
+
+Status StreamStateTest::ReadOutStream(
+    BufferedTupleStream* stream, RowBatch* read_batch, int64_t& num_read) {
+  bool eos = false;
+  num_read = 0;
+  do {
+    read_batch->Reset();
+    RETURN_IF_ERROR(stream->GetNext(read_batch, &eos));
+    num_read += read_batch->num_rows();
+  } while (!eos);
+  return Status::OK();
+}
+
+void StreamStateTest::VerifyStreamState(BufferedTupleStream* stream, int 
num_page,
+    int num_pinned_page, int num_unpinned_page, int buffer_size) {
+  ASSERT_EQ(stream->pages_.size(), num_page);
+  ASSERT_EQ(stream->num_pages_, num_page);
+  ASSERT_EQ(stream->BytesPinned(false), buffer_size * num_pinned_page);
+  ASSERT_EQ(stream->bytes_unpinned(), buffer_size * num_unpinned_page);
+  stream->CheckConsistencyFull(stream->read_it_);
+}
+
+TEST_F(StreamStateTest, DeferAdvancingReadPage) {
+  TestDeferAdvancingReadPage();
+}
+
+TEST_F(StreamStateTest, 
UnpinFullyExhaustedReadPageOnReadWriteStreamNoAttachRefill) {
+  TestUnpinAfterFullStreamRead(true, false, true);
+}
+
+TEST_F(StreamStateTest, 
UnpinFullyExhaustedReadPageOnReadWriteStreamNoAttachNoRefill) {
+  TestUnpinAfterFullStreamRead(true, false, false);
+}
+
+TEST_F(StreamStateTest, 
UnpinFullyExhaustedReadPageOnReadWriteStreamAttachRefill) {
+  TestUnpinAfterFullStreamRead(true, true, true);
+}
+
+TEST_F(StreamStateTest, 
UnpinFullyExhaustedReadPageOnReadWriteStreamAttachNoRefill) {
+  TestUnpinAfterFullStreamRead(true, true, false);
+}
+
+TEST_F(StreamStateTest, UnpinFullyExhaustedReadPageOnReadOnlyStreamAttach) {
+  TestUnpinAfterFullStreamRead(false, true, false);
+}
+
+TEST_F(StreamStateTest, UnpinFullyExhaustedReadPageOnReadOnlyStreamNoAttach) {
+  TestUnpinAfterFullStreamRead(false, false, false);
+}
+
+TEST_F(StreamStateTest, ShortDebugString) {
+  TestShortDebugString();
+}
 }
 
 int main(int argc, char** argv) {
diff --git a/be/src/runtime/buffered-tuple-stream.cc 
b/be/src/runtime/buffered-tuple-stream.cc
index 01edc84..4330696 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -719,16 +719,24 @@ Status BufferedTupleStream::UnpinStream(UnpinMode mode) {
     bool defer_advancing_read_page = false;
     if (&*read_it_.read_page_ != write_page_ && read_it_.read_page_ != 
pages_.end()
         && read_it_.read_page_rows_returned_ == read_it_.read_page_->num_rows) 
{
-      if (read_it_.read_page_->attached_to_output_batch) {
-        if (num_pages_ <= 2) {
-          // NextReadPage will attempt to save default_page_len_ into write 
reservation if
-          // the stream ended up with only 1 read/write page after advancing 
the read
-          // page. This can potentially lead to negative unused reservation if 
the reader
-          // has not freed the row batch where the read page buffer is 
attached to. We
-          // defer advancing the read page until the next GetNext() call by 
the reader
-          // (see IMPALA-10584).
-          defer_advancing_read_page = true;
-        }
+      if (has_write_iterator_ && read_it_.attach_on_read_
+          && (num_pages_ <= 2 || 
!read_it_.read_page_->attached_to_output_batch)) {
+        // In a read-write stream + attach_on_read mode, there are cases where 
we should
+        // NOT advance the read page even though the page has been fully 
exhausted:
+        //
+        // 1. Stream has exactly 2 pages: 1 read and 1 write.
+        //    NextReadPage() will attempt to save default_page_len_ into write
+        //    reservation if the stream ended up with only 1 read/write page 
after
+        //    advancing the read page. This can potentially lead to negative 
unused
+        //    reservation if the reader has not freed the row batch where the 
read page
+        //    buffer is attached to (see IMPALA-10584).
+        // 2. Read page buffer has not been attached yet to the output row 
batch.
+        //    The previous GetNext() would not attach the read page buffer to 
the output
+        //    row batch if it was a read-write page (see IMPALA-10714).
+        //
+        // We defer advancing the read page for these cases until the next 
GetNext()
+        // call by the reader.
+        defer_advancing_read_page = true;
       }
 
       if (!defer_advancing_read_page) {
@@ -742,7 +750,7 @@ Status BufferedTupleStream::UnpinStream(UnpinMode mode) {
     std::list<Page>::iterator it = pages_.begin();
     if (defer_advancing_read_page) {
       // We skip advancing the read page earlier, so the first page must be a 
read page
-      // and attached_to_output_batch is true. We should keep the first page 
pinned. The
+      // and the reader has not done reading it. We should keep the first page 
pinned. The
       // next GetNext() call is the one who will be responsible to unpin the 
first page.
       DCHECK(read_it_.read_page_ == pages_.begin());
       ++it;
diff --git a/be/src/runtime/buffered-tuple-stream.h 
b/be/src/runtime/buffered-tuple-stream.h
index bba6479..8546414 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -484,6 +484,7 @@ class BufferedTupleStream {
 
    private:
     friend class BufferedTupleStream;
+    friend class StreamStateTest;
 
     /// True if the read iterator is currently valid
     bool valid_ = false;
@@ -560,8 +561,7 @@ class BufferedTupleStream {
 
  private:
   DISALLOW_COPY_AND_ASSIGN(BufferedTupleStream);
-  friend class SimpleTupleStreamTest_ShortDebugString_Test;
-  friend class SimpleTupleStreamTest_DeferAdvancingReadPage_Test;
+  friend class StreamStateTest;
 
   /// Runtime state instance used to check for cancellation. Not owned.
   RuntimeState* const state_;

Reply via email to