Repository: incubator-impala
Updated Branches:
  refs/heads/master bc1feb34d -> 07d3cea1f


IMPALA-5618: buffered-tuple-stream-v2 fixes

This fixes two issues:
* AddRowCustom() caused a performance regression when the function
  was heap-allocated. This is solved by splitting the API into two
  separate calls. This imposes an additional burden on the caller
  but it is easier to reason about its performance.
* Allow re-reading streams with 'delete_on_read_' set so long as no rows
  were read from the stream. This is necessary for some spilling ExecNodes
  that prepare the stream for reading in order to acquire the buffer,
  but then need to spill the stream to free memory before they actually
  are able to read the stream.

Change-Id: Ibab0d774f66be632f17376a56abf302821cca047
Reviewed-on: http://gerrit.cloudera.org:8080/7358
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/081ecf01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/081ecf01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/081ecf01

Branch: refs/heads/master
Commit: 081ecf01526449c2360d2d702afc1488b57e07fb
Parents: bc1feb3
Author: Tim Armstrong <[email protected]>
Authored: Wed Jul 5 17:55:58 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Fri Jul 7 08:15:59 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/buffered-tuple-stream-v2-test.cc | 18 +++--
 be/src/runtime/buffered-tuple-stream-v2.cc      | 51 +++++++-------
 be/src/runtime/buffered-tuple-stream-v2.h       | 72 +++++++++++---------
 .../runtime/buffered-tuple-stream-v2.inline.h   | 12 ++--
 4 files changed, 81 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/081ecf01/be/src/runtime/buffered-tuple-stream-v2-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2-test.cc 
b/be/src/runtime/buffered-tuple-stream-v2-test.cc
index 277a564..7e4cef8 100644
--- a/be/src/runtime/buffered-tuple-stream-v2-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-v2-test.cc
@@ -794,12 +794,11 @@ TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
     for (int j = 0; j < batch->num_rows(); ++j) {
       int fixed_size = tuple_desc.byte_size();
       // Copy fixed portion in, but leave it pointing to row batch's varlen 
data.
-      ASSERT_TRUE(stream.AddRowCustom(fixed_size,
-          [batch, fixed_size, j](uint8_t* tuple_data) {
-            memcpy(tuple_data, batch->GetRow(j)->GetTuple(0), fixed_size);
-          },
-          &status));
+      uint8_t* tuple_data = stream.AddRowCustomBegin(fixed_size, &status);
+      ASSERT_TRUE(tuple_data != nullptr);
       ASSERT_TRUE(status.ok());
+      memcpy(tuple_data, batch->GetRow(j)->GetTuple(0), fixed_size);
+      stream.AddRowCustomEnd(fixed_size);
     }
     rows_added += batch->num_rows();
   }
@@ -1125,12 +1124,11 @@ TEST_F(MultiTupleStreamTest, MultiTupleAddRowCustom) {
         fixed_size += tuple_desc->byte_size();
         varlen_size += row->GetTuple(k)->VarlenByteSize(*tuple_desc);
       }
-      ASSERT_TRUE(stream.AddRowCustom(fixed_size + varlen_size,
-          [this, row, fixed_size, varlen_size](uint8_t* data) {
-            WriteStringRow(string_desc_, row, fixed_size, varlen_size, data);
-          },
-          &status));
+      uint8_t* data = stream.AddRowCustomBegin(fixed_size + varlen_size, 
&status);
+      ASSERT_TRUE(data != nullptr);
       ASSERT_TRUE(status.ok());
+      WriteStringRow(string_desc_, row, fixed_size, varlen_size, data);
+      stream.AddRowCustomEnd(fixed_size + varlen_size);
     }
     rows_added += batch->num_rows();
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/081ecf01/be/src/runtime/buffered-tuple-stream-v2.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.cc 
b/be/src/runtime/buffered-tuple-stream-v2.cc
index 82da2bc..90d9c12 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.cc
+++ b/be/src/runtime/buffered-tuple-stream-v2.cc
@@ -540,6 +540,9 @@ void BufferedTupleStreamV2::InvalidateReadIterator() {
   if (read_page_reservation_.GetReservation() > 0) {
     buffer_pool_client_->RestoreReservation(&read_page_reservation_, 
default_page_len_);
   }
+  // It is safe to re-read a delete-on-read stream if no rows were read and no 
pages
+  // were therefore deleted.
+  if (rows_returned_ == 0) delete_on_read_ = false;
 }
 
 Status BufferedTupleStreamV2::PrepareForRead(bool delete_on_read, bool* 
got_reservation) {
@@ -863,39 +866,41 @@ int64_t BufferedTupleStreamV2::ComputeRowSize(TupleRow* 
row) const noexcept {
 }
 
 bool BufferedTupleStreamV2::AddRowSlow(TupleRow* row, Status* status) noexcept 
{
-  // Use AddRowCustomSlow() to do the work of advancing the page.
+  // Use AddRowCustom*() to do the work of advancing the page.
   int64_t row_size = ComputeRowSize(row);
-  return AddRowCustomSlow(row_size,
-      [this, row, row_size](uint8_t* data) {
-        bool success = DeepCopy(row, &data, data + row_size);
-        DCHECK(success);
-        DCHECK_EQ(data, write_ptr_);
-      },
-      status);
+  uint8_t* data = AddRowCustomBeginSlow(row_size, status);
+  if (data == nullptr) return false;
+  bool success = DeepCopy(row, &data, data + row_size);
+  DCHECK(success);
+  DCHECK_EQ(data, write_ptr_);
+  AddRowCustomEnd(row_size);
+  return true;
 }
 
-bool BufferedTupleStreamV2::AddRowCustomSlow(
-    int64_t size, const WriteRowFn& write_fn, Status* status) noexcept {
+uint8_t* BufferedTupleStreamV2::AddRowCustomBeginSlow(
+    int64_t size, Status* status) noexcept {
   bool got_reservation;
   *status = AdvanceWritePage(size, &got_reservation);
-  if (!status->ok() || !got_reservation) return false;
+  if (!status->ok() || !got_reservation) return nullptr;
 
   // We have a large-enough page so now success is guaranteed.
-  bool result = AddRowCustom(size, write_fn, status);
-  DCHECK(result);
-  if (size > default_page_len_) {
-    // Immediately unpin the large write page so that we're not using up extra 
reservation
-    // and so we don't append another row to the page.
-    ResetWritePage();
-    // Save some of the reservation we freed up so we can create the next 
write page when
-    // needed.
-    if (NeedWriteReservation()) {
-      buffer_pool_client_->SaveReservation(&write_page_reservation_, 
default_page_len_);
-    }
+  uint8_t* result = AddRowCustomBegin(size, status);
+  DCHECK(result != nullptr);
+  return result;
+}
+
+void BufferedTupleStreamV2::AddLargeRowCustomEnd(int64_t size) noexcept {
+  DCHECK_GT(size, default_page_len_);
+  // Immediately unpin the large write page so that we're not using up extra 
reservation
+  // and so we don't append another row to the page.
+  ResetWritePage();
+  // Save some of the reservation we freed up so we can create the next write 
page when
+  // needed.
+  if (NeedWriteReservation()) {
+    buffer_pool_client_->SaveReservation(&write_page_reservation_, 
default_page_len_);
   }
   // The stream should be in a consistent state once the row is added.
   CHECK_CONSISTENCY();
-  return true;
 }
 
 bool BufferedTupleStreamV2::AddRow(TupleRow* row, Status* status) noexcept {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/081ecf01/be/src/runtime/buffered-tuple-stream-v2.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.h 
b/be/src/runtime/buffered-tuple-stream-v2.h
index c06dc6c..1f21235 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.h
+++ b/be/src/runtime/buffered-tuple-stream-v2.h
@@ -52,15 +52,15 @@ class TupleRow;
 /// PrepareForReadWrite() is called to initialize both read and write 
iterators to enable
 /// interleaved reads and writes.
 ///
-/// To use write-only mode, PrepareForWrite() is called once and 
AddRow()/AddRowCustom()
+/// To use write-only mode, PrepareForWrite() is called once and 
AddRow()/AddRowCustom*()
 /// are called repeatedly to initialize then advance a write iterator through 
the stream.
 /// Once the stream is fully written, it can be read back by calling 
PrepareForRead()
 /// then GetNext() repeatedly to advance a read iterator through the stream, 
or by
 /// calling GetRows() to get all of the rows at once.
 ///
 /// To use read/write mode, PrepareForReadWrite() is called once to initialize 
the read
-/// and write iterators. AddRow()/AddRowCustom() then advance a write iterator 
through the
-/// stream, and GetNext() advances a trailing read iterator through the stream.
+/// and write iterators. AddRow()/AddRowCustom*() then advance a write 
iterator through
+/// the stream, and GetNext() advances a trailing read iterator through the 
stream.
 ///
 /// Buffer management:
 /// The tuple stream is backed by a sequence of BufferPool Pages. The tuple 
stream uses
@@ -87,7 +87,7 @@ class TupleRow;
 /// To read or write a row larger than the default page size to/from an 
unpinned stream,
 /// the client must have max_page_len - default_page_len unused reservation. 
Writing a
 /// large row to an unpinned stream only uses the reservation for the duration 
of the
-/// AddRow()/AddRowCustom() call. Reading a large row from an unpinned stream 
uses the
+/// AddRow()/AddRowCustom*() call. Reading a large row from an unpinned stream 
uses the
 /// reservation until the next call to GetNext(). E.g. to partition a single 
unpinned
 /// stream into n unpinned streams, the reservation needed is (n - 1) *
 /// default_page_len + 2 * max_page_len: one large read buffer and one large 
write
@@ -187,15 +187,16 @@ class TupleRow;
 /// the stream may be freed on the next call to GetNext().
 /// TODO: IMPALA-4179, instead of needs_deep_copy, attach the pages' buffers 
to the batch.
 ///
-/// Manual construction of rows with AddRowCustom():
-/// The BufferedTupleStream supports allocation of uninitialized rows with 
AddRowCustom().
-/// AddRowCustom() is called instead of AddRow() if the client wants to 
manually construct
-/// a row. The caller of AddRowCustom() is responsible for providing a 
callback function
-/// that writes the row with exactly the layout described above.
+/// Manual construction of rows with AddRowCustomBegin()/AddRowCustomEnd():
+/// The BufferedTupleStream supports allocation of uninitialized rows with
+/// AddRowCustom*(). AddRowCustomBegin() is called instead of AddRow() if the 
client wants
+/// to manually construct a row. The caller of AddRowCustomBegin() is 
responsible for
+/// writing the row with exactly the layout described above then calling
+/// AddRowCustomEnd() when done.
 ///
 /// If a caller constructs a tuple in this way, the caller can set the 
pointers and they
 /// will not be modified until the stream is read via GetNext() or GetRows().
-/// TODO: IMPALA-5007: try to remove AddRowCustom() by unifying with AddRow().
+/// TODO: IMPALA-5007: try to remove AddRowCustom*() by unifying with AddRow().
 ///
 /// TODO: we need to be able to do read ahead for pages. We need some way to 
indicate a
 /// page will need to be pinned soon.
@@ -223,7 +224,8 @@ class BufferedTupleStreamV2 {
 
   /// Prepares the stream for writing by saving enough reservation for a 
default-size
   /// write page. Tries to increase reservation if there is not enough unused 
reservation
-  /// for a page. Called after Init() and before the first AddRow() or 
AddRowCustom() call.
+  /// for a page. Called after Init() and before the first AddRow() or
+  /// AddRowCustomBegin() call.
   /// 'got_reservation': set to true if there was enough reservation to 
initialize the
   ///     first write page and false if there was not enough reservation and 
no other
   ///     error was encountered. Undefined if an error status is returned.
@@ -231,8 +233,8 @@ class BufferedTupleStreamV2 {
 
   /// Prepares the stream for interleaved reads and writes by saving enough 
reservation
   /// for default-sized read and write pages. Called after Init() and before 
the first
-  /// AddRow() or AddRowCustom() call.
-  /// delete_on_read: Pages are deleted after they are read.
+  /// AddRow() or AddRowCustomBegin() call.
+  /// 'delete_on_read': Pages are deleted after they are read.
   /// 'got_reservation': set to true if there was enough reservation to 
initialize the
   ///     read and write pages and false if there was not enough reservation 
and no other
   ///     error was encountered. Undefined if an error status is returned.
@@ -240,11 +242,11 @@ class BufferedTupleStreamV2 {
       bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
 
   /// Prepares the stream for reading, invalidating the write iterator (if 
there is one).
-  /// Therefore must be called after the last AddRow() or AddRowCustom() and 
before
+  /// Therefore must be called after the last AddRow() or AddRowCustomEnd() 
and before
   /// GetNext(). PrepareForRead() can be called multiple times to do multiple 
read passes
-  /// over the stream, unless PrepareForRead() or PrepareForReadWrite() was 
previously
-  /// called with delete_on_read = true.
-  /// delete_on_read: Pages are deleted after they are read.
+  /// over the stream, unless rows were read from the stream after 
PrepareForRead() or
+  /// PrepareForReadWrite() was called with delete_on_read = true.
+  /// 'delete_on_read': Pages are deleted after they are read.
   /// 'got_reservation': set to true if there was enough reservation to 
initialize the
   ///     first read page and false if there was not enough reservation and no 
other
   ///     error was encountered. Undefined if an error status is returned.
@@ -271,21 +273,19 @@ class BufferedTupleStreamV2 {
   /// returns an error, it should not be called again.
   bool AddRow(TupleRow* row, Status* status) noexcept WARN_UNUSED_RESULT;
 
-  /// A function that writes a row to 'data' with the format described in the 
class
-  /// comment.
+  /// Allocates space to store a row of 'size' bytes (including fixed and 
variable length
+  /// data). If successful, returns a pointer to the allocated row. The caller 
then must
+  /// writes valid data to the row and call AddRowCustomEnd().
   ///
-  /// Use boost::function instead of std::function because it is better at 
avoiding heap
-  /// allocations when capturing a small number of variables. In GCC 
4.9.2/Boost 1.57,
-  /// boost::function can store up to 3 64-bit pointers without making a heap 
allocation,
-  /// but std::function always makes a heap allocation.
-  typedef boost::function<void(uint8_t* data)> WriteRowFn;
+  /// If unsuccessful, returns nullptr. The failure modes are the same as 
described in the
+  /// AddRow() comment.
+  ALWAYS_INLINE uint8_t* AddRowCustomBegin(int64_t size, Status* status);
 
-  /// Allocates space to store a row of 'size' bytes (including fixed and 
variable length
-  /// data). If successful, calls 'write_fn' with a pointer to the start of 
the allocated
-  /// space and returns true. Otherwise returns false. The failure modes are 
the same as
-  /// described in the AddRow() comment.
-  ALWAYS_INLINE bool AddRowCustom(
-      int64_t size, const WriteRowFn& write_fn, Status* status);
+  /// Called after AddRowCustomBegin() when done writing the row. Only should 
be called
+  /// if AddRowCustomBegin() succeeded. See the AddRowCustomBegin() comment for
+  /// explanation.
+  /// 'size': the size passed into AddRowCustomBegin().
+  void AddRowCustomEnd(int64_t size);
 
   /// Unflattens 'flat_row' into a regular TupleRow 'row'. Only valid to call 
if the
   /// stream is pinned. The row must have been allocated with the stream's row 
desc.
@@ -476,7 +476,7 @@ class BufferedTupleStreamV2 {
 
   /// The current page for writing. NULL if there is no write iterator or no 
current
   /// write page. Always pinned. Size is 'default_page_len_', except 
temporarily while
-  /// appending a larger row in AddRowCustomSlow().
+  /// appending a larger row between AddRowCustomBegin() and AddRowCustomEnd().
   Page* write_page_;
 
   /// Saved reservation for write iterator. 'default_page_len_' reservation is 
saved if
@@ -508,7 +508,8 @@ class BufferedTupleStreamV2 {
   /// Whether any tuple in the rows is nullable.
   const bool has_nullable_tuple_;
 
-  /// If true, pages are deleted after they are read.
+  /// If true, pages are deleted after they are read during this read pass. 
Once rows
+  /// have been read from a stream with 'delete_on_read_' true, this is always 
true.
   bool delete_on_read_;
 
   bool closed_; // Used for debugging.
@@ -532,9 +533,12 @@ class BufferedTupleStreamV2 {
   /// the current page.
   bool AddRowSlow(TupleRow* row, Status* status) noexcept;
 
-  /// The slow path for AddRowCustom() that is called if there is not 
sufficient space in
+  /// The slow path for AddRowCustomBegin() that is called if there is not 
sufficient space in
   /// the current page.
-  bool AddRowCustomSlow(int64_t size, const WriteRowFn& write_fn, Status* 
status) noexcept;
+  uint8_t* AddRowCustomBeginSlow(int64_t size, Status* status) noexcept;
+
+  /// The slow path for AddRowCustomEnd() that is called for large pages.
+  void AddLargeRowCustomEnd(int64_t size) noexcept;
 
   /// Copies 'row' into the buffer starting at *data and ending at the byte 
before
   /// 'data_end'. On success, returns true and updates *data to point after 
the last

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/081ecf01/be/src/runtime/buffered-tuple-stream-v2.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.inline.h 
b/be/src/runtime/buffered-tuple-stream-v2.inline.h
index a3b219c..7022249 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.inline.h
+++ b/be/src/runtime/buffered-tuple-stream-v2.inline.h
@@ -31,12 +31,11 @@ inline int 
BufferedTupleStreamV2::NullIndicatorBytesPerRow() const {
   return BitUtil::RoundUpNumBytes(fixed_tuple_sizes_.size());
 }
 
-inline bool BufferedTupleStreamV2::AddRowCustom(
-    int64_t size, const WriteRowFn& write_fn, Status* status) {
+inline uint8_t* BufferedTupleStreamV2::AddRowCustomBegin(int64_t size, Status* 
status) {
   DCHECK(!closed_);
   DCHECK(has_write_iterator());
   if (UNLIKELY(write_page_ == nullptr || write_ptr_ + size > write_end_ptr_)) {
-    return AddRowCustomSlow(size, write_fn, status);
+    return AddRowCustomBeginSlow(size, status);
   }
   DCHECK(write_page_ != nullptr);
   DCHECK(write_page_->is_pinned());
@@ -46,8 +45,11 @@ inline bool BufferedTupleStreamV2::AddRowCustom(
 
   uint8_t* data = write_ptr_;
   write_ptr_ += size;
-  write_fn(data);
-  return true;
+  return data;
+}
+
+inline void BufferedTupleStreamV2::AddRowCustomEnd(int64_t size) {
+  if (UNLIKELY(size > default_page_len_)) AddLargeRowCustomEnd(size);
 }
 }
 

Reply via email to