Repository: incubator-impala
Updated Branches:
  refs/heads/master bd6d2df73 -> a3ce5b448


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a3ce5b44/be/src/runtime/buffered-tuple-stream-v2.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.h 
b/be/src/runtime/buffered-tuple-stream-v2.h
index e5fea47..4376c11 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.h
+++ b/be/src/runtime/buffered-tuple-stream-v2.h
@@ -21,6 +21,7 @@
 #include <set>
 #include <vector>
 #include <boost/scoped_ptr.hpp>
+#include <boost/function.hpp>
 
 #include "common/global-types.h"
 #include "common/status.h"
@@ -51,14 +52,14 @@ class TupleRow;
 /// PrepareForReadWrite() is called to initialize both read and write 
iterators to enable
 /// interleaved reads and writes.
 ///
-/// To use write-only mode, PrepareForWrite() is called once and 
AddRow()/AllocateRow()
+/// To use write-only mode, PrepareForWrite() is called once and 
AddRow()/AddRowCustom()
 /// are called repeatedly to initialize then advance a write iterator through 
the stream.
 /// Once the stream is fully written, it can be read back by calling 
PrepareForRead()
 /// then GetNext() repeatedly to advance a read iterator through the stream, 
or by
 /// calling GetRows() to get all of the rows at once.
 ///
 /// To use read/write mode, PrepareForReadWrite() is called once to initialize 
the read
-/// and write iterators. AddRow()/AllocateRow() then advance a write iterator 
through the
+/// and write iterators. AddRow()/AddRowCustom() then advance a write iterator 
through the
 /// stream, and GetNext() advances a trailing read iterator through the stream.
 ///
 /// Buffer management:
@@ -66,19 +67,32 @@ class TupleRow;
 /// the client's reservation to pin pages in memory. It will automatically try 
to
 /// increase the client's reservation whenever it needs to do so to make 
progress.
 ///
+/// Normally pages are all of the same default page length, but larger pages 
up to the
+/// max page length are used if needed to store rows that are too large for a
+/// default-length page.
+///
 /// The stream has both pinned and unpinned modes. In the pinned mode all 
pages are
 /// pinned for reading. The pinned mode avoids I/O by keeping all pages pinned 
in memory
 /// and allows clients to save pointers to rows in the stream and randomly 
access them.
 /// E.g. hash tables can be backed by a BufferedTupleStream. In the unpinned 
mode, only
 /// pages currently being read and written are pinned and other pages are 
unpinned and
-/// therefore do not use the client's reservation and can be spilled to disk.
+/// therefore do not use the client's reservation and can be spilled to disk. 
The stream
+/// always holds onto a default page's worth of reservation for the read and 
write
+/// iterators (i.e. two page's worth if the stream is in read/write mode), 
even if that
+/// many pages are not currently pinned. This means that UnpinStream() always 
succeeds,
+/// and moving to the next default-length write page or read page on an 
unpinned stream
+/// does not require additional reservation. This is implemented by saving 
reservations
+/// in SubReservations.
 ///
-/// When the stream is in read/write mode, the stream always uses one buffer's 
worth
-/// of reservation of writing and at least one buffer's worth of reservation 
for reading,
-/// even if the same page is currently being read and written. This means that
-/// UnpinStream() always succeeds, and moving to the next write page or read 
page on an
-/// unpinned stream does not require additional reservation.
-/// TODO: IMPALA-3208: variable-length pages will add a caveat here.
+/// To read or write a row larger than the default page size to/from an 
unpinned stream,
+/// the client must have max_page_len - default_page_len unused reservation. 
Writing a
+/// large row to an unpinned stream only uses the reservation for the duration 
of the
+/// AddRow()/AddRowCustom() call. Reading a large row from an unpinned stream 
uses the
+/// reservation until the next call to GetNext(). E.g. to partition a single 
unpinned
+/// stream into n unpinned streams, the reservation needed is (n - 1) *
+/// default_page_len + 2 * max_page_len: one large read buffer and one large 
write
+/// buffer is needed to keep the row being processed in-memory, but only 
default-sized
+/// buffers are needed for the other streams being written.
 ///
 /// The tuple stream also supports a 'delete_on_read' mode, enabled by passing 
a flag
 /// to PrepareForRead() which deletes the stream's pages as it does a final 
read
@@ -90,7 +104,8 @@ class TupleRow;
 /// Page layout:
 /// Rows are stored back to back starting at the first byte of each page's 
buffer, with
 /// no interleaving of data from different rows. There is no padding or 
alignment
-/// between rows.
+/// between rows. Rows larger than the default page length are stored on their 
own
+/// page.
 ///
 /// Tuple row layout:
 /// If the stream's tuples are nullable (i.e. has_nullable_tuple_ is true), 
there is a
@@ -160,8 +175,8 @@ class TupleRow;
 ///     to pin a single write page is required to write to the stream, 
regardless of the
 ///     stream's size.
 ///   2. Pinned: Pages are left pinned. If the next page in the stream cannot 
be pinned
-///     because the caller's reservation is insufficient (and could not be 
increased by
-///     the stream), the read call will fail and the caller can either unpin 
the stream
+///     because the client's reservation is insufficient (and could not be 
increased by
+///     the stream), the read call will fail and the client can either unpin 
the stream
 ///     or free up other memory before retrying.
 ///
 /// Memory lifetime of rows read from stream:
@@ -172,15 +187,15 @@ class TupleRow;
 /// the stream may be freed on the next call to GetNext().
 /// TODO: IMPALA-4179, instead of needs_deep_copy, attach the pages' buffers 
to the batch.
 ///
-/// Manual construction of rows with AllocateRow():
-/// The BufferedTupleStream supports allocation of uninitialized rows with 
AllocateRow().
-/// AllocateRow() is called instead of AddRow() if the caller wants to 
manually construct
-/// a row. The caller of AllocateRow() is responsible for writing the row with 
exactly the
-/// layout described above.
+/// Manual construction of rows with AddRowCustom():
+/// The BufferedTupleStream supports allocation of uninitialized rows with 
AddRowCustom().
+/// AddRowCustom() is called instead of AddRow() if the client wants to 
manually construct
+/// a row. The caller of AddRowCustom() is responsible for providing a 
callback function
+/// that writes the row with exactly the layout described above.
 ///
 /// If a caller constructs a tuple in this way, the caller can set the 
pointers and they
 /// will not be modified until the stream is read via GetNext() or GetRows().
-/// TODO: IMPALA-5007: try to remove AllocateRow() by unifying with AddRow().
+/// TODO: IMPALA-5007: try to remove AddRowCustom() by unifying with AddRow().
 ///
 /// TODO: we need to be able to do read ahead for pages. We need some way to 
indicate a
 /// page will need to be pinned soon.
@@ -192,10 +207,10 @@ class BufferedTupleStreamV2 {
   /// row_desc: description of rows stored in the stream. This is the desc for 
rows
   /// that are added and the rows being returned.
   /// page_len: the size of pages to use in the stream
-  /// TODO:IMPALA-3208: support a default and maximum page length
   /// ext_varlen_slots: set of varlen slots with data stored externally to the 
stream
   BufferedTupleStreamV2(RuntimeState* state, const RowDescriptor& row_desc,
-      BufferPool::ClientHandle* buffer_pool_client, int64_t page_len,
+      BufferPool::ClientHandle* buffer_pool_client, int64_t default_page_len,
+      int64_t max_page_len,
       const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>());
 
   virtual ~BufferedTupleStreamV2();
@@ -206,16 +221,17 @@ class BufferedTupleStreamV2 {
   /// 'node_id' is only used for error reporting.
   Status Init(int node_id, bool pinned) WARN_UNUSED_RESULT;
 
-  /// Prepares the stream for writing by attempting to allocate a write 
buffer. Tries to
-  /// increase reservation if there is not enough unused reservation for the 
buffer.
-  /// Called after Init() and before the first AddRow() or AllocateRow() call.
+  /// Prepares the stream for writing by saving enough reservation for a 
default-size
+  /// write page. Tries to increase reservation if there is not enough unused 
reservation
+  /// for a page. Called after Init() and before the first AddRow() or 
AddRowCustom() call.
   /// 'got_reservation': set to true if there was enough reservation to 
initialize the
   ///     first write page and false if there was not enough reservation and 
no other
   ///     error was encountered. Undefined if an error status is returned.
   Status PrepareForWrite(bool* got_reservation) WARN_UNUSED_RESULT;
 
-  /// Prepares the stream for interleaved reads and writes by allocating read 
and write
-  /// buffers. Called after Init() and before the first AddRow() or 
AllocateRow() call.
+  /// Prepares the stream for interleaved reads and writes by saving enough 
reservation
+  /// for default-sized read and write pages. Called after Init() and before 
the first
+  /// AddRow() or AddRowCustom() call.
   /// delete_on_read: Pages are deleted after they are read.
   /// 'got_reservation': set to true if there was enough reservation to 
initialize the
   ///     read and write pages and false if there was not enough reservation 
and no other
@@ -224,7 +240,7 @@ class BufferedTupleStreamV2 {
       bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
 
   /// Prepares the stream for reading, invalidating the write iterator (if 
there is one).
-  /// Therefore must be called after the last AddRow() or AllocateRow() and 
before
+  /// Therefore must be called after the last AddRow() or AddRowCustom() and 
before
   /// GetNext(). PrepareForRead() can be called multiple times to do multiple 
read passes
   /// over the stream, unless PrepareForRead() or PrepareForReadWrite() was 
previously
   /// called with delete_on_read = true.
@@ -237,31 +253,39 @@ class BufferedTupleStreamV2 {
   /// Adds a single row to the stream. There are three possible outcomes:
   /// a) The append succeeds. True is returned.
   /// b) The append fails because the unused reservation was not sufficient to 
add
-  ///   a new page to the stream and the stream could not increase the 
reservation
-  ///   sufficiently. Returns false and sets 'status' to OK. The append can be 
retried
-  ///   after freeing up memory or unpinning the stream.
+  ///   a new page to the stream large enough to fit 'row' and the stream 
could not
+  ///   increase the reservation to get enough unused reservation. Returns 
false and
+  ///   sets 'status' to OK. The append can be retried after freeing up memory 
or
+  ///   unpinning the stream.
   /// c) The append fails with a runtime error. Returns false and sets 
'status' to an
   ///   error.
   /// d) The append fails becase the row is too large to fit in a page of a 
stream.
   ///   Returns false and sets 'status' to an error.
   ///
-  /// Unpinned streams avoid case b) because memory is automatically freed up 
by
-  /// unpinning the current write page.
-  /// TODO: IMPALA-3808: update to reflect behaviour with variable-length pages
+  /// Unpinned streams can only encounter case b) when appending a row larger 
than
+  /// the default page size and the reservation could not be increased 
sufficiently.
+  /// Otherwise enough memory is automatically freed up by unpinning the 
current write
+  /// page.
   ///
   /// BufferedTupleStream will do a deep copy of the memory in the row. After 
AddRow()
   /// returns an error, it should not be called again.
   bool AddRow(TupleRow* row, Status* status) noexcept WARN_UNUSED_RESULT;
 
-  /// Allocates space to store a row of with fixed length 'fixed_size' and 
variable
-  /// length data 'varlen_size'. If successful, returns the pointer where 
fixed length
-  /// data should be stored and assigns 'varlen_data' to where var-len data 
should
-  /// be stored.  AllocateRow does not currently support nullable tuples.
+  /// A function that writes a row to 'data' with the format described in the 
class
+  /// comment.
   ///
-  /// The meaning of the return values are the same as AddRow(), except 
failure is
-  /// indicated by returning NULL instead of false.
-  uint8_t* AllocateRow(
-      int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status);
+  /// Use boost::function instead of std::function because it is better at 
avoiding heap
+  /// allocations when capturing a small number of variables. In GCC 
4.9.2/Boost 1.57,
+  /// boost::function can store up to 3 64-bit pointers without making a heap 
allocation,
+  /// but std::function always makes a heap allocation.
+  typedef boost::function<void(uint8_t* data)> WriteRowFn;
+
+  /// Allocates space to store a row of 'size' bytes (including fixed and 
variable length
+  /// data). If successful, calls 'write_fn' with a pointer to the start of 
the allocated
+  /// space and returns true. Otherwise returns false. The failure modes are 
the same as
+  /// described in the AddRow() comment.
+  ALWAYS_INLINE bool AddRowCustom(
+      int64_t size, const WriteRowFn& write_fn, Status* status);
 
   /// Unflattens 'flat_row' into a regular TupleRow 'row'. Only valid to call 
if the
   /// stream is pinned. The row must have been allocated with the stream's row 
desc.
@@ -341,8 +365,8 @@ class BufferedTupleStreamV2 {
 
   bool is_closed() const { return closed_; }
   bool is_pinned() const { return pinned_; }
-  bool has_read_iterator() const { return read_page_ != pages_.end(); }
-  bool has_write_iterator() const { return write_page_ != nullptr; }
+  bool has_read_iterator() const { return has_read_iterator_; }
+  bool has_write_iterator() const { return has_write_iterator_; }
 
   std::string DebugString() const;
 
@@ -384,9 +408,8 @@ class BufferedTupleStreamV2 {
   /// Description of rows stored in the stream.
   const RowDescriptor& desc_;
 
-  /// Sum of the fixed length portion of all the tuples in desc_, including 
any null
-  /// indicators.
-  int fixed_tuple_row_size_;
+  /// Plan node ID, used for error reporting.
+  int node_id_;
 
   /// The size of the fixed length portion for each tuple in the row.
   std::vector<int> fixed_tuple_sizes_;
@@ -404,21 +427,31 @@ class BufferedTupleStreamV2 {
   BufferPool::ClientHandle* buffer_pool_client_;
 
   /// List of pages in the stream.
-  /// Empty before PrepareForWrite() is called or after the stream has been 
destructively
-  /// read in 'delete_on_read' mode. Non-empty otherwise.
+  /// Empty iff one of two cases applies:
+  /// * before the first row has been added with AddRow() or AddRowCustom().
+  /// * after the stream has been destructively read in 'delete_on_read' mode
   std::list<Page> pages_;
 
   /// Total size of pages_, including any pages already deleted in 
'delete_on_read'
   /// mode.
   int64_t total_byte_size_;
 
-  /// Iterator pointing to the current page for reading. Equal to list.end() 
when no
-  /// read iterator is active. GetNext() does not advance this past the end of
-  /// the stream, so upon eos 'read_page_' points to the last page and
-  /// rows_returned_ == num_rows_. Always pinned, unless a Pin() call failed 
and an
-  /// error status was returned.
+  /// True if there is currently an active read iterator for the stream.
+  bool has_read_iterator_;
+
+  /// The current page being read. When no read iterator is active, equal to 
list.end().
+  /// When a read iterator is active, either points to the current read page, 
or equals
+  /// list.end() if no rows have yet been read.  GetNext() does not advance 
this past
+  /// the end of the stream, so upon eos 'read_page_' points to the last page 
and
+  /// rows_returned_ == num_rows_. Always pinned, unless a Pin() call failed 
and an error
+  /// status was returned.
   std::list<Page>::iterator read_page_;
 
+  /// Saved reservation for read iterator. 'default_page_len_' reservation is 
saved if
+  /// there is a read iterator, no pinned read page, and the possibility that 
the read
+  /// iterator will advance to a valid page.
+  BufferPool::SubReservation read_page_reservation_;
+
   /// Number of rows returned from the current read_page_.
   uint32_t read_page_rows_returned_;
 
@@ -438,11 +471,23 @@ class BufferedTupleStreamV2 {
   /// PrepareForRead() call.
   int64_t rows_returned_;
 
-  /// The current page for writing. NULL if there is no available page to 
write to.
-  /// Always pinned. If 'read_page_' and 'write_page_' reference the same 
page, then
-  /// that page is only pinned once.
+  /// True if there is currently an active write iterator into the stream.
+  bool has_write_iterator_;
+
+  /// The current page for writing. NULL if there is no write iterator or no 
current
+  /// write page. Always pinned. Size is 'default_page_len_', except 
temporarily while
+  /// appending a larger row in AddRowCustomSlow().
   Page* write_page_;
 
+  /// Saved reservation for write iterator. 'default_page_len_' reservation is 
saved if
+  /// there is a write iterator, no page currently pinned for writing and the 
possibility
+  /// that a pin count will be needed for the write iterator in future. 
Specifically if:
+  /// * no rows have been appended to the stream and 'pages_' is empty, or
+  /// * the stream is unpinned, 'write_page_' is null and and the last page in 
'pages_'
+  ///   is a large page that we advanced past, or
+  /// * there is only one pinned page in the stream and it is already pinned 
for reading.
+  BufferPool::SubReservation write_page_reservation_;
+
   /// Total bytes of pinned pages in pages_, stored to avoid iterating over 
the list
   /// to compute it.
   int64_t bytes_pinned_;
@@ -451,9 +496,14 @@ class BufferedTupleStreamV2 {
   /// a destructive 'delete_on_read' pass over the stream.
   int64_t num_rows_;
 
-  /// The length in bytes of pages used to store the stream's rows.
-  /// TODO: IMPALA-3808: support variable-length pages
-  const int64_t page_len_;
+  /// The default length in bytes of pages used to store the stream's rows. 
All rows that
+  /// fit in a default-sized page are stored in default-sized page.
+  const int64_t default_page_len_;
+
+  /// The maximum length in bytes of pages used to store the stream's rows. 
This is a
+  /// hard limit on the maximum size of row that can be stored in the stream 
and the
+  /// amount of reservation required to read or write to an unpinned stream.
+  const int64_t max_page_len_;
 
   /// Whether any tuple in the rows is nullable.
   const bool has_nullable_tuple_;
@@ -468,53 +518,55 @@ class BufferedTupleStreamV2 {
   bool pinned_;
 
   bool is_read_page(const Page* page) const {
-    return has_read_iterator() && &*read_page_ == page;
+    return read_page_ != pages_.end() && &*read_page_ == page;
   }
 
   bool is_write_page(const Page* page) const { return write_page_ == page; }
 
+  /// Return true if the read and write page are the same.
+  bool has_read_write_page() const {
+    return write_page_ != nullptr && is_read_page(write_page_);
+  }
+
   /// The slow path for AddRow() that is called if there is not sufficient 
space in
   /// the current page.
   bool AddRowSlow(TupleRow* row, Status* status) noexcept;
 
-  /// The slow path for AllocateRow() that is called if there is not 
sufficient space in
+  /// The slow path for AddRowCustom() that is called if there is not 
sufficient space in
   /// the current page.
-  uint8_t* AllocateRowSlow(
-      int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status) 
noexcept;
+  bool AddRowCustomSlow(int64_t size, const WriteRowFn& write_fn, Status* 
status) noexcept;
+
+  /// Copies 'row' into the buffer starting at *data and ending at the byte 
before
+  /// 'data_end'. On success, returns true and updates *data to point after 
the last
+  /// byte written. Returns false if there is not enough space in the buffer 
provided.
+  bool DeepCopy(TupleRow* row, uint8_t** data, const uint8_t* data_end) 
noexcept;
 
-  /// Copies 'row' into write_page_. Returns false if there is not enough 
space in
-  /// 'write_page_'. After returning false, write_ptr_ may be left pointing to 
the
-  /// partially-written row, and no more data can be written to write_page_.
+  /// Templated implementation of DeepCopy().
   template <bool HAS_NULLABLE_TUPLE>
-  bool DeepCopyInternal(TupleRow* row) noexcept;
+  bool DeepCopyInternal(TupleRow* row, uint8_t** data, const uint8_t* 
data_end) noexcept;
 
-  /// Helper function to copy strings in string_slots from tuple into 
write_page_.
-  /// Updates write_ptr_ to the end of the string data added. Returns false if 
the data
-  /// does not fit in the current write page. After returning false, 
write_ptr_ is left
-  /// pointing to the partially-written row, and no more data can be written to
-  /// write_page_.
-  bool CopyStrings(const Tuple* tuple, const std::vector<SlotDescriptor*>& 
string_slots);
+  /// Helper function to copy strings in string_slots from tuple into *data.
+  /// Updates *data to the end of the string data added. Returns false if the 
data
+  /// does not fit in the buffer [*data, data_end).
+  static bool CopyStrings(const Tuple* tuple,
+      const std::vector<SlotDescriptor*>& string_slots, uint8_t** data,
+      const uint8_t* data_end);
 
   /// Helper function to deep copy collections in collection_slots from tuple 
into
-  /// write_page_. Updates write_ptr_ to the end of the collection data added. 
Returns
-  /// false if the data does not fit in the current write page. After 
returning false,
-  /// write_ptr_ is left pointing to the partially-written row, and no more 
data can be
-  /// written to write_page_.
-  bool CopyCollections(
-      const Tuple* tuple, const std::vector<SlotDescriptor*>& 
collection_slots);
-
-  /// Wrapper of the templated DeepCopyInternal() function.
-  bool DeepCopy(TupleRow* row) noexcept;
-
-  /// Gets a new page of 'page_len_' bytes from buffer_pool_, updating 
write_page_,
-  /// write_ptr_ and write_end_ptr_. The caller must ensure there is 
sufficient unused
-  /// reservation to allocate the page. The caller must reset the write 
iterator (if
-  /// there is one).
-  Status NewWritePage() noexcept WARN_UNUSED_RESULT;
-
-  /// Validates that a page can fit a row of 'row_size' bytes.
+  /// the buffer [*data, data_end). Updates *data to the end of the collection 
data
+  /// added. Returns false if the data does not fit in the buffer.
+  static bool CopyCollections(const Tuple* tuple,
+      const std::vector<SlotDescriptor*>& collection_slots, uint8_t** data,
+      const uint8_t* data_end);
+
+  /// Gets a new page of 'page_len' bytes from buffer_pool_, updating 
write_page_,
+  /// write_ptr_ and write_end_ptr_. The caller must ensure there is 
'page_len' unused
+  /// reservation. The caller must reset the write page (if there is one) 
before calling.
+  Status NewWritePage(int64_t page_len) noexcept WARN_UNUSED_RESULT;
+
+  /// Determines what page size is needed to fit a row of 'row_size' bytes.
   /// Returns an error if the row cannot fit in a page.
-  Status CheckPageSizeForRow(int64_t row_size);
+  Status CalcPageLenForRow(int64_t row_size, int64_t* page_len);
 
   /// Wrapper around NewWritePage() that allocates a new write page that fits 
a row of
   /// 'row_size' bytes. Increases reservation if needed to allocate the next 
page.
@@ -525,9 +577,14 @@ class BufferedTupleStreamV2 {
   Status AdvanceWritePage(
       int64_t row_size, bool* got_reservation) noexcept WARN_UNUSED_RESULT;
 
-  /// Reset the write page, if there is one, and unpin pages accordingly.
+  /// Reset the write page, if there is one, and unpin pages accordingly. If 
there
+  /// is an active write iterator, the next row will be appended to a new page.
   void ResetWritePage();
 
+  /// Invalidate the write iterator and release any resources associated with 
it. After
+  /// calling this, no more rows can be appended to the stream.
+  void InvalidateWriteIterator();
+
   /// Same as PrepareForRead(), except the iterators are not invalidated and
   /// the caller is assumed to have checked there is sufficient unused 
reservation.
   Status PrepareForReadInternal(bool delete_on_read) WARN_UNUSED_RESULT;
@@ -537,8 +594,9 @@ class BufferedTupleStreamV2 {
   /// read_page_rows_returned_.
   Status NextReadPage() WARN_UNUSED_RESULT;
 
-  /// Reset the read page, if there is one, and unpin pages accordingly.
-  void ResetReadPage();
+  /// Invalidate the read iterator, and release any resources associated with 
the active
+  /// iterator.
+  void InvalidateReadIterator();
 
   /// Returns the total additional bytes that this row will consume in 
write_page_ if
   /// appended to the page. This includes the row's null indicators, the fixed 
length
@@ -565,6 +623,36 @@ class BufferedTupleStreamV2 {
   /// read and write pages and whether the stream is pinned.
   int ExpectedPinCount(bool stream_pinned, const Page* page) const;
 
+  /// Return true if the stream in its current state needs to have a 
reservation for
+  /// a write page stored in 'write_page_reservation_'.
+  bool NeedWriteReservation() const;
+
+  /// Same as above, except assume the stream's 'pinned_' state is 
'stream_pinned'.
+  bool NeedWriteReservation(bool stream_pinned) const;
+
+  /// Same as above, except assume the stream has 'num_pages' pages and 
different
+  /// iterator state.
+  static bool NeedWriteReservation(bool stream_pinned, int64_t num_pages,
+      bool has_write_iterator, bool has_write_page, bool has_read_write_page);
+
+  /// Return true if the stream in its current state needs to have a 
reservation for
+  /// a read page stored in 'read_page_reservation_'.
+  bool NeedReadReservation() const;
+
+  /// Same as above, except assume the stream's 'pinned_' state is 
'stream_pinned'.
+  bool NeedReadReservation(bool stream_pinned) const;
+
+  /// Same as above, except assume the stream has 'num_pages' pages and a 
different
+  /// read iterator state.
+  bool NeedReadReservation(bool stream_pinned, int64_t num_pages, bool 
has_read_iterator,
+      bool has_read_page) const;
+
+  /// Same as above, except assume the stream has 'num_pages' pages and a 
different
+  /// write iterator state.
+  static bool NeedReadReservation(bool stream_pinned, int64_t num_pages,
+      bool has_read_iterator, bool has_read_page, bool has_write_iterator,
+      bool has_write_page);
+
   /// Templated GetNext implementations.
   template <bool FILL_FLAT_ROWS>
   Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* 
flat_rows);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a3ce5b44/be/src/runtime/buffered-tuple-stream-v2.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.inline.h 
b/be/src/runtime/buffered-tuple-stream-v2.inline.h
index 6ad4bc4..a3b219c 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.inline.h
+++ b/be/src/runtime/buffered-tuple-stream-v2.inline.h
@@ -31,31 +31,23 @@ inline int 
BufferedTupleStreamV2::NullIndicatorBytesPerRow() const {
   return BitUtil::RoundUpNumBytes(fixed_tuple_sizes_.size());
 }
 
-inline bool BufferedTupleStreamV2::AddRow(TupleRow* row, Status* status) 
noexcept {
+inline bool BufferedTupleStreamV2::AddRowCustom(
+    int64_t size, const WriteRowFn& write_fn, Status* status) {
   DCHECK(!closed_);
-  if (LIKELY(DeepCopy(row))) return true;
-  return AddRowSlow(row, status);
-}
-
-inline uint8_t* BufferedTupleStreamV2::AllocateRow(
-    int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status) {
-  DCHECK(!closed_);
-  DCHECK(!has_nullable_tuple_) << "AllocateRow does not support nullable 
tuples";
-  const int total_size = fixed_size + varlen_size;
-  if (UNLIKELY(write_page_ == nullptr || write_ptr_ + total_size > 
write_end_ptr_)) {
-    return AllocateRowSlow(fixed_size, varlen_size, varlen_data, status);
+  DCHECK(has_write_iterator());
+  if (UNLIKELY(write_page_ == nullptr || write_ptr_ + size > write_end_ptr_)) {
+    return AddRowCustomSlow(size, write_fn, status);
   }
   DCHECK(write_page_ != nullptr);
   DCHECK(write_page_->is_pinned());
-  DCHECK_LE(write_ptr_ + total_size, write_end_ptr_);
+  DCHECK_LE(write_ptr_ + size, write_end_ptr_);
   ++num_rows_;
   ++write_page_->num_rows;
 
-  uint8_t* fixed_data = write_ptr_;
-  write_ptr_ += fixed_size;
-  *varlen_data = write_ptr_;
-  write_ptr_ += varlen_size;
-  return fixed_data;
+  uint8_t* data = write_ptr_;
+  write_ptr_ += size;
+  write_fn(data);
+  return true;
 }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a3ce5b44/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc 
b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 6185e36..1fb60ed 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -1888,6 +1888,39 @@ void BufferPoolTest::TestRandomInternalImpl(BufferPool* 
pool, FileGroup* file_gr
   for (auto& buffer : buffers) pool->FreeBuffer(&client, &buffer.first);
   pool->DeregisterClient(&client);
 }
+
+/// Test basic SubReservation functionality.
+TEST_F(BufferPoolTest, SubReservation) {
+  const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 10;
+  global_reservations_.InitRootTracker(NULL, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool::ClientHandle client;
+  ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, 
NULL,
+      TOTAL_MEM, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservationToFit(TEST_BUFFER_LEN));
+
+  BufferPool::SubReservation subreservation(&client);
+  BufferPool::BufferHandle buffer;
+  // Save and check that the reservation moved as expected.
+  client.SaveReservation(&subreservation, TEST_BUFFER_LEN);
+  EXPECT_EQ(0, client.GetUnusedReservation());
+  EXPECT_EQ(TEST_BUFFER_LEN, subreservation.GetReservation());
+
+  // Should not be able to allocate from client since the reservation was 
moved.
+  IMPALA_ASSERT_DEBUG_DEATH(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN), 
"");
+
+  // Restore and check that the reservation moved as expected.
+  client.RestoreReservation(&subreservation, TEST_BUFFER_LEN);
+  EXPECT_EQ(TEST_BUFFER_LEN, client.GetUnusedReservation());
+  EXPECT_EQ(0, subreservation.GetReservation());
+
+  // Should be able to allocate from the client after restoring.
+  ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN));
+  EXPECT_EQ(TEST_BUFFER_LEN, client.GetUnusedReservation());
+
+  subreservation.Close();
+  pool.DeregisterClient(&client);
+}
 }
 
 int main(int argc, char** argv) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a3ce5b44/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc 
b/be/src/runtime/bufferpool/buffer-pool.cc
index 5593a41..9b16112 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -308,6 +308,40 @@ int64_t BufferPool::ClientHandle::GetUnusedReservation() 
const {
   return impl_->reservation()->GetUnusedReservation();
 }
 
+void BufferPool::ClientHandle::SaveReservation(SubReservation* dst, int64_t 
bytes) {
+  DCHECK_EQ(dst->tracker_->parent(), impl_->reservation());
+  bool success = 
impl_->reservation()->TransferReservationTo(dst->tracker_.get(), bytes);
+  DCHECK(success); // SubReservation should not have a limit, so this 
shouldn't fail.
+}
+
+void BufferPool::ClientHandle::RestoreReservation(SubReservation* src, int64_t 
bytes) {
+  DCHECK_EQ(src->tracker_->parent(), impl_->reservation());
+  bool success = src->tracker_->TransferReservationTo(impl_->reservation(), 
bytes);
+  DCHECK(success); // Transferring reservation to parent shouldn't fail.
+}
+
+BufferPool::SubReservation::SubReservation(ClientHandle* client) {
+  tracker_.reset(new ReservationTracker);
+  tracker_->InitChildTracker(
+      nullptr, client->impl_->reservation(), nullptr, 
numeric_limits<int64_t>::max());
+}
+
+BufferPool::SubReservation::~SubReservation() {}
+
+int64_t BufferPool::SubReservation::GetReservation() const {
+  return tracker_->GetReservation();
+}
+
+void BufferPool::SubReservation::Close() {
+  // Give any reservation back to the client.
+  if (is_closed()) return;
+  bool success =
+      tracker_->TransferReservationTo(tracker_->parent(), 
tracker_->GetReservation());
+  DCHECK(success); // Transferring reservation to parent shouldn't fail.
+  tracker_->Close();
+  tracker_.reset();
+}
+
 BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
     const string& name, ReservationTracker* parent_reservation, MemTracker* 
mem_tracker,
     int64_t reservation_limit, RuntimeProfile* profile)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a3ce5b44/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h 
b/be/src/runtime/bufferpool/buffer-pool.h
index 93c08bd..f2ff99b 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -151,6 +151,7 @@ class BufferPool : public CacheLineAligned {
   class BufferHandle;
   class ClientHandle;
   class PageHandle;
+  class SubReservation;
 
   /// Constructs a new buffer pool.
   /// 'min_buffer_len': the minimum buffer length for the pool. Must be a 
power of two.
@@ -323,6 +324,14 @@ class BufferPool::ClientHandle {
   /// if successful, after which 'bytes' can be used.
   bool IncreaseReservationToFit(int64_t bytes) WARN_UNUSED_RESULT;
 
+  /// Move some of this client's reservation to the SubReservation. 'bytes' of 
unused
+  /// reservation must be available in this tracker.
+  void SaveReservation(SubReservation* dst, int64_t bytes);
+
+  /// Move some of src's reservation to this client. 'bytes' of unused 
reservation must be
+  /// available in 'src'.
+  void RestoreReservation(SubReservation* src, int64_t bytes);
+
   /// Accessors for this client's reservation corresponding to the 
identically-named
   /// methods in ReservationTracker.
   int64_t GetReservation() const;
@@ -336,6 +345,7 @@ class BufferPool::ClientHandle {
  private:
   friend class BufferPool;
   friend class BufferPoolTest;
+  friend class SubReservation;
   DISALLOW_COPY_AND_ASSIGN(ClientHandle);
 
   /// Internal state for the client. NULL means the client isn't registered.
@@ -343,6 +353,31 @@ class BufferPool::ClientHandle {
   Client* impl_;
 };
 
+/// Helper class that allows dividing up a client's reservation into separate 
buckets.
+class BufferPool::SubReservation {
+ public:
+  SubReservation(ClientHandle* client);
+  ~SubReservation();
+
+  /// Returns the amount of reservation stored in this sub-reservation.
+  int64_t GetReservation() const;
+
+  /// Releases the subreservation to the client's tracker. Must be called 
before
+  /// destruction.
+  void Close();
+
+  bool is_closed() const { return tracker_ == nullptr; }
+
+ private:
+  friend class BufferPool::ClientHandle;
+  DISALLOW_COPY_AND_ASSIGN(SubReservation);
+
+  /// Child of the client's tracker used to track the sub-reservation. Usage 
is not
+  /// tracked against this tracker - instead the reservation is always 
transferred back
+  /// to the client's tracker before use.
+  boost::scoped_ptr<ReservationTracker> tracker_;
+};
+
 /// A handle to a buffer allocated from the buffer pool. Each BufferHandle 
should only
 /// be used by a single thread at a time: concurrently calling BufferHandle 
methods or
 /// BufferPool methods with the BufferHandle as an argument is not supported.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a3ce5b44/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc 
b/be/src/runtime/bufferpool/reservation-tracker-test.cc
index a794a91..4197340 100644
--- a/be/src/runtime/bufferpool/reservation-tracker-test.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -359,7 +359,7 @@ TEST_F(ReservationTrackerTest, 
MemTrackerIntegrationMultiLevel) {
       ASSERT_EQ(amount, mem_trackers[ancestor]->consumption());
     }
     // Return the reservation to the root before the next iteration.
-    reservations[level].TransferReservationTo(&reservations[0], amount);
+    ASSERT_TRUE(reservations[level].TransferReservationTo(&reservations[0], 
amount));
   }
 
   for (int i = HIERARCHY_DEPTH - 1; i >= 0; --i) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a3ce5b44/be/src/runtime/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.h 
b/be/src/runtime/bufferpool/reservation-tracker.h
index 5be2c23..4d525c0 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.h
+++ b/be/src/runtime/bufferpool/reservation-tracker.h
@@ -134,7 +134,7 @@ class ReservationTracker {
   /// (because linked MemTrackers with limits below the query level are not 
supported).
   /// Returns true on success or false if the transfer would have caused a 
reservation
   /// limit to be exceeded.
-  bool TransferReservationTo(ReservationTracker* other, int64_t bytes);
+  bool TransferReservationTo(ReservationTracker* other, int64_t bytes) 
WARN_UNUSED_RESULT;
 
   /// Allocate 'bytes' from the reservation. The tracker must have at least 
'bytes'
   /// unused reservation before calling this method.
@@ -159,6 +159,8 @@ class ReservationTracker {
   /// Returns the total reservations of children in bytes.
   int64_t GetChildReservations();
 
+  ReservationTracker* parent() const { return parent_; }
+
   std::string DebugString();
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a3ce5b44/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py 
b/common/thrift/generate_error_codes.py
index 8611295..bccb779 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -319,6 +319,10 @@ error_codes = (
   ("KUDU_TIMESTAMP_OUT_OF_RANGE", 103,
    "Kudu table '$0' column '$1' contains an out of range timestamp. "
    "The valid date range is 1400-01-01..9999-12-31."),
+
+  # TODO: IMPALA-3200: make sure that this references the correct query option.
+  ("MAX_ROW_SIZE", 104, "Row of size $0 could not be materialized in plan node 
with "
+    "id $1. Limit is $2, which can be increased with query option 
max_row_size"),
 )
 
 import sys


Reply via email to