Repository: incubator-impala Updated Branches: refs/heads/master bd6d2df73 -> a3ce5b448
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a3ce5b44/be/src/runtime/buffered-tuple-stream-v2.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-v2.h b/be/src/runtime/buffered-tuple-stream-v2.h index e5fea47..4376c11 100644 --- a/be/src/runtime/buffered-tuple-stream-v2.h +++ b/be/src/runtime/buffered-tuple-stream-v2.h @@ -21,6 +21,7 @@ #include <set> #include <vector> #include <boost/scoped_ptr.hpp> +#include <boost/function.hpp> #include "common/global-types.h" #include "common/status.h" @@ -51,14 +52,14 @@ class TupleRow; /// PrepareForReadWrite() is called to initialize both read and write iterators to enable /// interleaved reads and writes. /// -/// To use write-only mode, PrepareForWrite() is called once and AddRow()/AllocateRow() +/// To use write-only mode, PrepareForWrite() is called once and AddRow()/AddRowCustom() /// are called repeatedly to initialize then advance a write iterator through the stream. /// Once the stream is fully written, it can be read back by calling PrepareForRead() /// then GetNext() repeatedly to advance a read iterator through the stream, or by /// calling GetRows() to get all of the rows at once. /// /// To use read/write mode, PrepareForReadWrite() is called once to initialize the read -/// and write iterators. AddRow()/AllocateRow() then advance a write iterator through the +/// and write iterators. AddRow()/AddRowCustom() then advance a write iterator through the /// stream, and GetNext() advances a trailing read iterator through the stream. /// /// Buffer management: @@ -66,19 +67,32 @@ class TupleRow; /// the client's reservation to pin pages in memory. It will automatically try to /// increase the client's reservation whenever it needs to do so to make progress. /// +/// Normally pages are all of the same default page length, but larger pages up to the +/// max page length are used if needed to store rows that are too large for a +/// default-length page. +/// /// The stream has both pinned and unpinned modes. In the pinned mode all pages are /// pinned for reading. The pinned mode avoids I/O by keeping all pages pinned in memory /// and allows clients to save pointers to rows in the stream and randomly access them. /// E.g. hash tables can be backed by a BufferedTupleStream. In the unpinned mode, only /// pages currently being read and written are pinned and other pages are unpinned and -/// therefore do not use the client's reservation and can be spilled to disk. +/// therefore do not use the client's reservation and can be spilled to disk. The stream +/// always holds onto a default page's worth of reservation for the read and write +/// iterators (i.e. two page's worth if the stream is in read/write mode), even if that +/// many pages are not currently pinned. This means that UnpinStream() always succeeds, +/// and moving to the next default-length write page or read page on an unpinned stream +/// does not require additional reservation. This is implemented by saving reservations +/// in SubReservations. /// -/// When the stream is in read/write mode, the stream always uses one buffer's worth -/// of reservation of writing and at least one buffer's worth of reservation for reading, -/// even if the same page is currently being read and written. This means that -/// UnpinStream() always succeeds, and moving to the next write page or read page on an -/// unpinned stream does not require additional reservation. -/// TODO: IMPALA-3208: variable-length pages will add a caveat here. +/// To read or write a row larger than the default page size to/from an unpinned stream, +/// the client must have max_page_len - default_page_len unused reservation. Writing a +/// large row to an unpinned stream only uses the reservation for the duration of the +/// AddRow()/AddRowCustom() call. Reading a large row from an unpinned stream uses the +/// reservation until the next call to GetNext(). E.g. to partition a single unpinned +/// stream into n unpinned streams, the reservation needed is (n - 1) * +/// default_page_len + 2 * max_page_len: one large read buffer and one large write +/// buffer is needed to keep the row being processed in-memory, but only default-sized +/// buffers are needed for the other streams being written. /// /// The tuple stream also supports a 'delete_on_read' mode, enabled by passing a flag /// to PrepareForRead() which deletes the stream's pages as it does a final read @@ -90,7 +104,8 @@ class TupleRow; /// Page layout: /// Rows are stored back to back starting at the first byte of each page's buffer, with /// no interleaving of data from different rows. There is no padding or alignment -/// between rows. +/// between rows. Rows larger than the default page length are stored on their own +/// page. /// /// Tuple row layout: /// If the stream's tuples are nullable (i.e. has_nullable_tuple_ is true), there is a @@ -160,8 +175,8 @@ class TupleRow; /// to pin a single write page is required to write to the stream, regardless of the /// stream's size. /// 2. Pinned: Pages are left pinned. If the next page in the stream cannot be pinned -/// because the caller's reservation is insufficient (and could not be increased by -/// the stream), the read call will fail and the caller can either unpin the stream +/// because the client's reservation is insufficient (and could not be increased by +/// the stream), the read call will fail and the client can either unpin the stream /// or free up other memory before retrying. /// /// Memory lifetime of rows read from stream: @@ -172,15 +187,15 @@ class TupleRow; /// the stream may be freed on the next call to GetNext(). /// TODO: IMPALA-4179, instead of needs_deep_copy, attach the pages' buffers to the batch. /// -/// Manual construction of rows with AllocateRow(): -/// The BufferedTupleStream supports allocation of uninitialized rows with AllocateRow(). -/// AllocateRow() is called instead of AddRow() if the caller wants to manually construct -/// a row. The caller of AllocateRow() is responsible for writing the row with exactly the -/// layout described above. +/// Manual construction of rows with AddRowCustom(): +/// The BufferedTupleStream supports allocation of uninitialized rows with AddRowCustom(). +/// AddRowCustom() is called instead of AddRow() if the client wants to manually construct +/// a row. The caller of AddRowCustom() is responsible for providing a callback function +/// that writes the row with exactly the layout described above. /// /// If a caller constructs a tuple in this way, the caller can set the pointers and they /// will not be modified until the stream is read via GetNext() or GetRows(). -/// TODO: IMPALA-5007: try to remove AllocateRow() by unifying with AddRow(). +/// TODO: IMPALA-5007: try to remove AddRowCustom() by unifying with AddRow(). /// /// TODO: we need to be able to do read ahead for pages. We need some way to indicate a /// page will need to be pinned soon. @@ -192,10 +207,10 @@ class BufferedTupleStreamV2 { /// row_desc: description of rows stored in the stream. This is the desc for rows /// that are added and the rows being returned. /// page_len: the size of pages to use in the stream - /// TODO:IMPALA-3208: support a default and maximum page length /// ext_varlen_slots: set of varlen slots with data stored externally to the stream BufferedTupleStreamV2(RuntimeState* state, const RowDescriptor& row_desc, - BufferPool::ClientHandle* buffer_pool_client, int64_t page_len, + BufferPool::ClientHandle* buffer_pool_client, int64_t default_page_len, + int64_t max_page_len, const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>()); virtual ~BufferedTupleStreamV2(); @@ -206,16 +221,17 @@ class BufferedTupleStreamV2 { /// 'node_id' is only used for error reporting. Status Init(int node_id, bool pinned) WARN_UNUSED_RESULT; - /// Prepares the stream for writing by attempting to allocate a write buffer. Tries to - /// increase reservation if there is not enough unused reservation for the buffer. - /// Called after Init() and before the first AddRow() or AllocateRow() call. + /// Prepares the stream for writing by saving enough reservation for a default-size + /// write page. Tries to increase reservation if there is not enough unused reservation + /// for a page. Called after Init() and before the first AddRow() or AddRowCustom() call. /// 'got_reservation': set to true if there was enough reservation to initialize the /// first write page and false if there was not enough reservation and no other /// error was encountered. Undefined if an error status is returned. Status PrepareForWrite(bool* got_reservation) WARN_UNUSED_RESULT; - /// Prepares the stream for interleaved reads and writes by allocating read and write - /// buffers. Called after Init() and before the first AddRow() or AllocateRow() call. + /// Prepares the stream for interleaved reads and writes by saving enough reservation + /// for default-sized read and write pages. Called after Init() and before the first + /// AddRow() or AddRowCustom() call. /// delete_on_read: Pages are deleted after they are read. /// 'got_reservation': set to true if there was enough reservation to initialize the /// read and write pages and false if there was not enough reservation and no other @@ -224,7 +240,7 @@ class BufferedTupleStreamV2 { bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT; /// Prepares the stream for reading, invalidating the write iterator (if there is one). - /// Therefore must be called after the last AddRow() or AllocateRow() and before + /// Therefore must be called after the last AddRow() or AddRowCustom() and before /// GetNext(). PrepareForRead() can be called multiple times to do multiple read passes /// over the stream, unless PrepareForRead() or PrepareForReadWrite() was previously /// called with delete_on_read = true. @@ -237,31 +253,39 @@ class BufferedTupleStreamV2 { /// Adds a single row to the stream. There are three possible outcomes: /// a) The append succeeds. True is returned. /// b) The append fails because the unused reservation was not sufficient to add - /// a new page to the stream and the stream could not increase the reservation - /// sufficiently. Returns false and sets 'status' to OK. The append can be retried - /// after freeing up memory or unpinning the stream. + /// a new page to the stream large enough to fit 'row' and the stream could not + /// increase the reservation to get enough unused reservation. Returns false and + /// sets 'status' to OK. The append can be retried after freeing up memory or + /// unpinning the stream. /// c) The append fails with a runtime error. Returns false and sets 'status' to an /// error. /// d) The append fails becase the row is too large to fit in a page of a stream. /// Returns false and sets 'status' to an error. /// - /// Unpinned streams avoid case b) because memory is automatically freed up by - /// unpinning the current write page. - /// TODO: IMPALA-3808: update to reflect behaviour with variable-length pages + /// Unpinned streams can only encounter case b) when appending a row larger than + /// the default page size and the reservation could not be increased sufficiently. + /// Otherwise enough memory is automatically freed up by unpinning the current write + /// page. /// /// BufferedTupleStream will do a deep copy of the memory in the row. After AddRow() /// returns an error, it should not be called again. bool AddRow(TupleRow* row, Status* status) noexcept WARN_UNUSED_RESULT; - /// Allocates space to store a row of with fixed length 'fixed_size' and variable - /// length data 'varlen_size'. If successful, returns the pointer where fixed length - /// data should be stored and assigns 'varlen_data' to where var-len data should - /// be stored. AllocateRow does not currently support nullable tuples. + /// A function that writes a row to 'data' with the format described in the class + /// comment. /// - /// The meaning of the return values are the same as AddRow(), except failure is - /// indicated by returning NULL instead of false. - uint8_t* AllocateRow( - int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status); + /// Use boost::function instead of std::function because it is better at avoiding heap + /// allocations when capturing a small number of variables. In GCC 4.9.2/Boost 1.57, + /// boost::function can store up to 3 64-bit pointers without making a heap allocation, + /// but std::function always makes a heap allocation. + typedef boost::function<void(uint8_t* data)> WriteRowFn; + + /// Allocates space to store a row of 'size' bytes (including fixed and variable length + /// data). If successful, calls 'write_fn' with a pointer to the start of the allocated + /// space and returns true. Otherwise returns false. The failure modes are the same as + /// described in the AddRow() comment. + ALWAYS_INLINE bool AddRowCustom( + int64_t size, const WriteRowFn& write_fn, Status* status); /// Unflattens 'flat_row' into a regular TupleRow 'row'. Only valid to call if the /// stream is pinned. The row must have been allocated with the stream's row desc. @@ -341,8 +365,8 @@ class BufferedTupleStreamV2 { bool is_closed() const { return closed_; } bool is_pinned() const { return pinned_; } - bool has_read_iterator() const { return read_page_ != pages_.end(); } - bool has_write_iterator() const { return write_page_ != nullptr; } + bool has_read_iterator() const { return has_read_iterator_; } + bool has_write_iterator() const { return has_write_iterator_; } std::string DebugString() const; @@ -384,9 +408,8 @@ class BufferedTupleStreamV2 { /// Description of rows stored in the stream. const RowDescriptor& desc_; - /// Sum of the fixed length portion of all the tuples in desc_, including any null - /// indicators. - int fixed_tuple_row_size_; + /// Plan node ID, used for error reporting. + int node_id_; /// The size of the fixed length portion for each tuple in the row. std::vector<int> fixed_tuple_sizes_; @@ -404,21 +427,31 @@ class BufferedTupleStreamV2 { BufferPool::ClientHandle* buffer_pool_client_; /// List of pages in the stream. - /// Empty before PrepareForWrite() is called or after the stream has been destructively - /// read in 'delete_on_read' mode. Non-empty otherwise. + /// Empty iff one of two cases applies: + /// * before the first row has been added with AddRow() or AddRowCustom(). + /// * after the stream has been destructively read in 'delete_on_read' mode std::list<Page> pages_; /// Total size of pages_, including any pages already deleted in 'delete_on_read' /// mode. int64_t total_byte_size_; - /// Iterator pointing to the current page for reading. Equal to list.end() when no - /// read iterator is active. GetNext() does not advance this past the end of - /// the stream, so upon eos 'read_page_' points to the last page and - /// rows_returned_ == num_rows_. Always pinned, unless a Pin() call failed and an - /// error status was returned. + /// True if there is currently an active read iterator for the stream. + bool has_read_iterator_; + + /// The current page being read. When no read iterator is active, equal to list.end(). + /// When a read iterator is active, either points to the current read page, or equals + /// list.end() if no rows have yet been read. GetNext() does not advance this past + /// the end of the stream, so upon eos 'read_page_' points to the last page and + /// rows_returned_ == num_rows_. Always pinned, unless a Pin() call failed and an error + /// status was returned. std::list<Page>::iterator read_page_; + /// Saved reservation for read iterator. 'default_page_len_' reservation is saved if + /// there is a read iterator, no pinned read page, and the possibility that the read + /// iterator will advance to a valid page. + BufferPool::SubReservation read_page_reservation_; + /// Number of rows returned from the current read_page_. uint32_t read_page_rows_returned_; @@ -438,11 +471,23 @@ class BufferedTupleStreamV2 { /// PrepareForRead() call. int64_t rows_returned_; - /// The current page for writing. NULL if there is no available page to write to. - /// Always pinned. If 'read_page_' and 'write_page_' reference the same page, then - /// that page is only pinned once. + /// True if there is currently an active write iterator into the stream. + bool has_write_iterator_; + + /// The current page for writing. NULL if there is no write iterator or no current + /// write page. Always pinned. Size is 'default_page_len_', except temporarily while + /// appending a larger row in AddRowCustomSlow(). Page* write_page_; + /// Saved reservation for write iterator. 'default_page_len_' reservation is saved if + /// there is a write iterator, no page currently pinned for writing and the possibility + /// that a pin count will be needed for the write iterator in future. Specifically if: + /// * no rows have been appended to the stream and 'pages_' is empty, or + /// * the stream is unpinned, 'write_page_' is null and and the last page in 'pages_' + /// is a large page that we advanced past, or + /// * there is only one pinned page in the stream and it is already pinned for reading. + BufferPool::SubReservation write_page_reservation_; + /// Total bytes of pinned pages in pages_, stored to avoid iterating over the list /// to compute it. int64_t bytes_pinned_; @@ -451,9 +496,14 @@ class BufferedTupleStreamV2 { /// a destructive 'delete_on_read' pass over the stream. int64_t num_rows_; - /// The length in bytes of pages used to store the stream's rows. - /// TODO: IMPALA-3808: support variable-length pages - const int64_t page_len_; + /// The default length in bytes of pages used to store the stream's rows. All rows that + /// fit in a default-sized page are stored in default-sized page. + const int64_t default_page_len_; + + /// The maximum length in bytes of pages used to store the stream's rows. This is a + /// hard limit on the maximum size of row that can be stored in the stream and the + /// amount of reservation required to read or write to an unpinned stream. + const int64_t max_page_len_; /// Whether any tuple in the rows is nullable. const bool has_nullable_tuple_; @@ -468,53 +518,55 @@ class BufferedTupleStreamV2 { bool pinned_; bool is_read_page(const Page* page) const { - return has_read_iterator() && &*read_page_ == page; + return read_page_ != pages_.end() && &*read_page_ == page; } bool is_write_page(const Page* page) const { return write_page_ == page; } + /// Return true if the read and write page are the same. + bool has_read_write_page() const { + return write_page_ != nullptr && is_read_page(write_page_); + } + /// The slow path for AddRow() that is called if there is not sufficient space in /// the current page. bool AddRowSlow(TupleRow* row, Status* status) noexcept; - /// The slow path for AllocateRow() that is called if there is not sufficient space in + /// The slow path for AddRowCustom() that is called if there is not sufficient space in /// the current page. - uint8_t* AllocateRowSlow( - int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status) noexcept; + bool AddRowCustomSlow(int64_t size, const WriteRowFn& write_fn, Status* status) noexcept; + + /// Copies 'row' into the buffer starting at *data and ending at the byte before + /// 'data_end'. On success, returns true and updates *data to point after the last + /// byte written. Returns false if there is not enough space in the buffer provided. + bool DeepCopy(TupleRow* row, uint8_t** data, const uint8_t* data_end) noexcept; - /// Copies 'row' into write_page_. Returns false if there is not enough space in - /// 'write_page_'. After returning false, write_ptr_ may be left pointing to the - /// partially-written row, and no more data can be written to write_page_. + /// Templated implementation of DeepCopy(). template <bool HAS_NULLABLE_TUPLE> - bool DeepCopyInternal(TupleRow* row) noexcept; + bool DeepCopyInternal(TupleRow* row, uint8_t** data, const uint8_t* data_end) noexcept; - /// Helper function to copy strings in string_slots from tuple into write_page_. - /// Updates write_ptr_ to the end of the string data added. Returns false if the data - /// does not fit in the current write page. After returning false, write_ptr_ is left - /// pointing to the partially-written row, and no more data can be written to - /// write_page_. - bool CopyStrings(const Tuple* tuple, const std::vector<SlotDescriptor*>& string_slots); + /// Helper function to copy strings in string_slots from tuple into *data. + /// Updates *data to the end of the string data added. Returns false if the data + /// does not fit in the buffer [*data, data_end). + static bool CopyStrings(const Tuple* tuple, + const std::vector<SlotDescriptor*>& string_slots, uint8_t** data, + const uint8_t* data_end); /// Helper function to deep copy collections in collection_slots from tuple into - /// write_page_. Updates write_ptr_ to the end of the collection data added. Returns - /// false if the data does not fit in the current write page. After returning false, - /// write_ptr_ is left pointing to the partially-written row, and no more data can be - /// written to write_page_. - bool CopyCollections( - const Tuple* tuple, const std::vector<SlotDescriptor*>& collection_slots); - - /// Wrapper of the templated DeepCopyInternal() function. - bool DeepCopy(TupleRow* row) noexcept; - - /// Gets a new page of 'page_len_' bytes from buffer_pool_, updating write_page_, - /// write_ptr_ and write_end_ptr_. The caller must ensure there is sufficient unused - /// reservation to allocate the page. The caller must reset the write iterator (if - /// there is one). - Status NewWritePage() noexcept WARN_UNUSED_RESULT; - - /// Validates that a page can fit a row of 'row_size' bytes. + /// the buffer [*data, data_end). Updates *data to the end of the collection data + /// added. Returns false if the data does not fit in the buffer. + static bool CopyCollections(const Tuple* tuple, + const std::vector<SlotDescriptor*>& collection_slots, uint8_t** data, + const uint8_t* data_end); + + /// Gets a new page of 'page_len' bytes from buffer_pool_, updating write_page_, + /// write_ptr_ and write_end_ptr_. The caller must ensure there is 'page_len' unused + /// reservation. The caller must reset the write page (if there is one) before calling. + Status NewWritePage(int64_t page_len) noexcept WARN_UNUSED_RESULT; + + /// Determines what page size is needed to fit a row of 'row_size' bytes. /// Returns an error if the row cannot fit in a page. - Status CheckPageSizeForRow(int64_t row_size); + Status CalcPageLenForRow(int64_t row_size, int64_t* page_len); /// Wrapper around NewWritePage() that allocates a new write page that fits a row of /// 'row_size' bytes. Increases reservation if needed to allocate the next page. @@ -525,9 +577,14 @@ class BufferedTupleStreamV2 { Status AdvanceWritePage( int64_t row_size, bool* got_reservation) noexcept WARN_UNUSED_RESULT; - /// Reset the write page, if there is one, and unpin pages accordingly. + /// Reset the write page, if there is one, and unpin pages accordingly. If there + /// is an active write iterator, the next row will be appended to a new page. void ResetWritePage(); + /// Invalidate the write iterator and release any resources associated with it. After + /// calling this, no more rows can be appended to the stream. + void InvalidateWriteIterator(); + /// Same as PrepareForRead(), except the iterators are not invalidated and /// the caller is assumed to have checked there is sufficient unused reservation. Status PrepareForReadInternal(bool delete_on_read) WARN_UNUSED_RESULT; @@ -537,8 +594,9 @@ class BufferedTupleStreamV2 { /// read_page_rows_returned_. Status NextReadPage() WARN_UNUSED_RESULT; - /// Reset the read page, if there is one, and unpin pages accordingly. - void ResetReadPage(); + /// Invalidate the read iterator, and release any resources associated with the active + /// iterator. + void InvalidateReadIterator(); /// Returns the total additional bytes that this row will consume in write_page_ if /// appended to the page. This includes the row's null indicators, the fixed length @@ -565,6 +623,36 @@ class BufferedTupleStreamV2 { /// read and write pages and whether the stream is pinned. int ExpectedPinCount(bool stream_pinned, const Page* page) const; + /// Return true if the stream in its current state needs to have a reservation for + /// a write page stored in 'write_page_reservation_'. + bool NeedWriteReservation() const; + + /// Same as above, except assume the stream's 'pinned_' state is 'stream_pinned'. + bool NeedWriteReservation(bool stream_pinned) const; + + /// Same as above, except assume the stream has 'num_pages' pages and different + /// iterator state. + static bool NeedWriteReservation(bool stream_pinned, int64_t num_pages, + bool has_write_iterator, bool has_write_page, bool has_read_write_page); + + /// Return true if the stream in its current state needs to have a reservation for + /// a read page stored in 'read_page_reservation_'. + bool NeedReadReservation() const; + + /// Same as above, except assume the stream's 'pinned_' state is 'stream_pinned'. + bool NeedReadReservation(bool stream_pinned) const; + + /// Same as above, except assume the stream has 'num_pages' pages and a different + /// read iterator state. + bool NeedReadReservation(bool stream_pinned, int64_t num_pages, bool has_read_iterator, + bool has_read_page) const; + + /// Same as above, except assume the stream has 'num_pages' pages and a different + /// write iterator state. + static bool NeedReadReservation(bool stream_pinned, int64_t num_pages, + bool has_read_iterator, bool has_read_page, bool has_write_iterator, + bool has_write_page); + /// Templated GetNext implementations. template <bool FILL_FLAT_ROWS> Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a3ce5b44/be/src/runtime/buffered-tuple-stream-v2.inline.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-v2.inline.h b/be/src/runtime/buffered-tuple-stream-v2.inline.h index 6ad4bc4..a3b219c 100644 --- a/be/src/runtime/buffered-tuple-stream-v2.inline.h +++ b/be/src/runtime/buffered-tuple-stream-v2.inline.h @@ -31,31 +31,23 @@ inline int BufferedTupleStreamV2::NullIndicatorBytesPerRow() const { return BitUtil::RoundUpNumBytes(fixed_tuple_sizes_.size()); } -inline bool BufferedTupleStreamV2::AddRow(TupleRow* row, Status* status) noexcept { +inline bool BufferedTupleStreamV2::AddRowCustom( + int64_t size, const WriteRowFn& write_fn, Status* status) { DCHECK(!closed_); - if (LIKELY(DeepCopy(row))) return true; - return AddRowSlow(row, status); -} - -inline uint8_t* BufferedTupleStreamV2::AllocateRow( - int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status) { - DCHECK(!closed_); - DCHECK(!has_nullable_tuple_) << "AllocateRow does not support nullable tuples"; - const int total_size = fixed_size + varlen_size; - if (UNLIKELY(write_page_ == nullptr || write_ptr_ + total_size > write_end_ptr_)) { - return AllocateRowSlow(fixed_size, varlen_size, varlen_data, status); + DCHECK(has_write_iterator()); + if (UNLIKELY(write_page_ == nullptr || write_ptr_ + size > write_end_ptr_)) { + return AddRowCustomSlow(size, write_fn, status); } DCHECK(write_page_ != nullptr); DCHECK(write_page_->is_pinned()); - DCHECK_LE(write_ptr_ + total_size, write_end_ptr_); + DCHECK_LE(write_ptr_ + size, write_end_ptr_); ++num_rows_; ++write_page_->num_rows; - uint8_t* fixed_data = write_ptr_; - write_ptr_ += fixed_size; - *varlen_data = write_ptr_; - write_ptr_ += varlen_size; - return fixed_data; + uint8_t* data = write_ptr_; + write_ptr_ += size; + write_fn(data); + return true; } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a3ce5b44/be/src/runtime/bufferpool/buffer-pool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc index 6185e36..1fb60ed 100644 --- a/be/src/runtime/bufferpool/buffer-pool-test.cc +++ b/be/src/runtime/bufferpool/buffer-pool-test.cc @@ -1888,6 +1888,39 @@ void BufferPoolTest::TestRandomInternalImpl(BufferPool* pool, FileGroup* file_gr for (auto& buffer : buffers) pool->FreeBuffer(&client, &buffer.first); pool->DeregisterClient(&client); } + +/// Test basic SubReservation functionality. +TEST_F(BufferPoolTest, SubReservation) { + const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 10; + global_reservations_.InitRootTracker(NULL, TOTAL_MEM); + BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM); + BufferPool::ClientHandle client; + ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL, + TOTAL_MEM, NewProfile(), &client)); + ASSERT_TRUE(client.IncreaseReservationToFit(TEST_BUFFER_LEN)); + + BufferPool::SubReservation subreservation(&client); + BufferPool::BufferHandle buffer; + // Save and check that the reservation moved as expected. + client.SaveReservation(&subreservation, TEST_BUFFER_LEN); + EXPECT_EQ(0, client.GetUnusedReservation()); + EXPECT_EQ(TEST_BUFFER_LEN, subreservation.GetReservation()); + + // Should not be able to allocate from client since the reservation was moved. + IMPALA_ASSERT_DEBUG_DEATH(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN), ""); + + // Restore and check that the reservation moved as expected. + client.RestoreReservation(&subreservation, TEST_BUFFER_LEN); + EXPECT_EQ(TEST_BUFFER_LEN, client.GetUnusedReservation()); + EXPECT_EQ(0, subreservation.GetReservation()); + + // Should be able to allocate from the client after restoring. + ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN)); + EXPECT_EQ(TEST_BUFFER_LEN, client.GetUnusedReservation()); + + subreservation.Close(); + pool.DeregisterClient(&client); +} } int main(int argc, char** argv) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a3ce5b44/be/src/runtime/bufferpool/buffer-pool.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc index 5593a41..9b16112 100644 --- a/be/src/runtime/bufferpool/buffer-pool.cc +++ b/be/src/runtime/bufferpool/buffer-pool.cc @@ -308,6 +308,40 @@ int64_t BufferPool::ClientHandle::GetUnusedReservation() const { return impl_->reservation()->GetUnusedReservation(); } +void BufferPool::ClientHandle::SaveReservation(SubReservation* dst, int64_t bytes) { + DCHECK_EQ(dst->tracker_->parent(), impl_->reservation()); + bool success = impl_->reservation()->TransferReservationTo(dst->tracker_.get(), bytes); + DCHECK(success); // SubReservation should not have a limit, so this shouldn't fail. +} + +void BufferPool::ClientHandle::RestoreReservation(SubReservation* src, int64_t bytes) { + DCHECK_EQ(src->tracker_->parent(), impl_->reservation()); + bool success = src->tracker_->TransferReservationTo(impl_->reservation(), bytes); + DCHECK(success); // Transferring reservation to parent shouldn't fail. +} + +BufferPool::SubReservation::SubReservation(ClientHandle* client) { + tracker_.reset(new ReservationTracker); + tracker_->InitChildTracker( + nullptr, client->impl_->reservation(), nullptr, numeric_limits<int64_t>::max()); +} + +BufferPool::SubReservation::~SubReservation() {} + +int64_t BufferPool::SubReservation::GetReservation() const { + return tracker_->GetReservation(); +} + +void BufferPool::SubReservation::Close() { + // Give any reservation back to the client. + if (is_closed()) return; + bool success = + tracker_->TransferReservationTo(tracker_->parent(), tracker_->GetReservation()); + DCHECK(success); // Transferring reservation to parent shouldn't fail. + tracker_->Close(); + tracker_.reset(); +} + BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group, const string& name, ReservationTracker* parent_reservation, MemTracker* mem_tracker, int64_t reservation_limit, RuntimeProfile* profile) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a3ce5b44/be/src/runtime/bufferpool/buffer-pool.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h index 93c08bd..f2ff99b 100644 --- a/be/src/runtime/bufferpool/buffer-pool.h +++ b/be/src/runtime/bufferpool/buffer-pool.h @@ -151,6 +151,7 @@ class BufferPool : public CacheLineAligned { class BufferHandle; class ClientHandle; class PageHandle; + class SubReservation; /// Constructs a new buffer pool. /// 'min_buffer_len': the minimum buffer length for the pool. Must be a power of two. @@ -323,6 +324,14 @@ class BufferPool::ClientHandle { /// if successful, after which 'bytes' can be used. bool IncreaseReservationToFit(int64_t bytes) WARN_UNUSED_RESULT; + /// Move some of this client's reservation to the SubReservation. 'bytes' of unused + /// reservation must be available in this tracker. + void SaveReservation(SubReservation* dst, int64_t bytes); + + /// Move some of src's reservation to this client. 'bytes' of unused reservation must be + /// available in 'src'. + void RestoreReservation(SubReservation* src, int64_t bytes); + /// Accessors for this client's reservation corresponding to the identically-named /// methods in ReservationTracker. int64_t GetReservation() const; @@ -336,6 +345,7 @@ class BufferPool::ClientHandle { private: friend class BufferPool; friend class BufferPoolTest; + friend class SubReservation; DISALLOW_COPY_AND_ASSIGN(ClientHandle); /// Internal state for the client. NULL means the client isn't registered. @@ -343,6 +353,31 @@ class BufferPool::ClientHandle { Client* impl_; }; +/// Helper class that allows dividing up a client's reservation into separate buckets. +class BufferPool::SubReservation { + public: + SubReservation(ClientHandle* client); + ~SubReservation(); + + /// Returns the amount of reservation stored in this sub-reservation. + int64_t GetReservation() const; + + /// Releases the subreservation to the client's tracker. Must be called before + /// destruction. + void Close(); + + bool is_closed() const { return tracker_ == nullptr; } + + private: + friend class BufferPool::ClientHandle; + DISALLOW_COPY_AND_ASSIGN(SubReservation); + + /// Child of the client's tracker used to track the sub-reservation. Usage is not + /// tracked against this tracker - instead the reservation is always transferred back + /// to the client's tracker before use. + boost::scoped_ptr<ReservationTracker> tracker_; +}; + /// A handle to a buffer allocated from the buffer pool. Each BufferHandle should only /// be used by a single thread at a time: concurrently calling BufferHandle methods or /// BufferPool methods with the BufferHandle as an argument is not supported. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a3ce5b44/be/src/runtime/bufferpool/reservation-tracker-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc index a794a91..4197340 100644 --- a/be/src/runtime/bufferpool/reservation-tracker-test.cc +++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc @@ -359,7 +359,7 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) { ASSERT_EQ(amount, mem_trackers[ancestor]->consumption()); } // Return the reservation to the root before the next iteration. - reservations[level].TransferReservationTo(&reservations[0], amount); + ASSERT_TRUE(reservations[level].TransferReservationTo(&reservations[0], amount)); } for (int i = HIERARCHY_DEPTH - 1; i >= 0; --i) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a3ce5b44/be/src/runtime/bufferpool/reservation-tracker.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/reservation-tracker.h b/be/src/runtime/bufferpool/reservation-tracker.h index 5be2c23..4d525c0 100644 --- a/be/src/runtime/bufferpool/reservation-tracker.h +++ b/be/src/runtime/bufferpool/reservation-tracker.h @@ -134,7 +134,7 @@ class ReservationTracker { /// (because linked MemTrackers with limits below the query level are not supported). /// Returns true on success or false if the transfer would have caused a reservation /// limit to be exceeded. - bool TransferReservationTo(ReservationTracker* other, int64_t bytes); + bool TransferReservationTo(ReservationTracker* other, int64_t bytes) WARN_UNUSED_RESULT; /// Allocate 'bytes' from the reservation. The tracker must have at least 'bytes' /// unused reservation before calling this method. @@ -159,6 +159,8 @@ class ReservationTracker { /// Returns the total reservations of children in bytes. int64_t GetChildReservations(); + ReservationTracker* parent() const { return parent_; } + std::string DebugString(); private: http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a3ce5b44/common/thrift/generate_error_codes.py ---------------------------------------------------------------------- diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index 8611295..bccb779 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -319,6 +319,10 @@ error_codes = ( ("KUDU_TIMESTAMP_OUT_OF_RANGE", 103, "Kudu table '$0' column '$1' contains an out of range timestamp. " "The valid date range is 1400-01-01..9999-12-31."), + + # TODO: IMPALA-3200: make sure that this references the correct query option. + ("MAX_ROW_SIZE", 104, "Row of size $0 could not be materialized in plan node with " + "id $1. Limit is $2, which can be increased with query option max_row_size"), ) import sys
