http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index b4ef279..ee0e4be 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -17,15 +17,20 @@
#include "runtime/sorter.h"
+#include <limits>
+
#include <boost/random/mersenne_twister.hpp>
#include <boost/random/uniform_int.hpp>
#include <gutil/strings/substitute.h>
-#include "runtime/buffered-block-mgr.h"
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/exec-env.h"
#include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/sorted-run-merger.h"
+#include "util/pretty-printer.h"
#include "util/runtime-profile-counters.h"
#include "common/names.h"
@@ -36,7 +41,7 @@ using namespace strings;
namespace impala {
-// Number of pinned blocks required for a merge with fixed-length data only.
+// Number of pinned pages required for a merge with fixed-length data only.
const int MIN_BUFFERS_PER_MERGE = 3;
// Maximum number of buffers to use in each merge to prevent sorter trying to
grab
@@ -46,35 +51,140 @@ const int MIN_BUFFERS_PER_MERGE = 3;
// we should base this on the number of reservations.
const int MAX_BUFFERS_PER_MERGE = 128;
-const string MEM_ALLOC_FAILED_ERROR_MSG = "Failed to allocate block for
$0-length "
- "data needed for sorting. Reducing query concurrency or increasing the "
- "memory limit may help this query to complete successfully.";
-
-const string MERGE_FAILED_ERROR_MSG = "Failed to allocate block to merge
spilled runs "
+const string MERGE_FAILED_ERROR_MSG = "Failed to allocate page to merge
spilled runs "
"during sorting. Only $0 runs could be merged, but must be able to merge
at least 2 "
"to make progress. Reducing query concurrency or increasing the memory
limit may "
"help this query to complete successfully.";
-/// Delete all non-null blocks in blocks and clear vector.
-static void DeleteAndClearBlocks(vector<BufferedBlockMgr::Block*>* blocks) {
- for (BufferedBlockMgr::Block* block: *blocks) {
- if (block != NULL) block->Delete();
+/// Wrapper around BufferPool::PageHandle that tracks additional info about
the page.
+/// The Page can be in four states:
+/// * Closed: The page starts in this state before Init() is called. Calling
+/// ExtractBuffer() or Close() puts the page back in this state. No other
operations
+/// are valid on a closed page.
+/// * In memory: the page is pinned and the buffer is in memory. data() is
valid. The
+/// page is in this state after Init(). If the page is pinned but not in
memory, it
+/// can be brought into this state by calling WaitForBuffer().
+/// * Unpinned: the page was unpinned by calling Unpin(). It is invalid to
access the
+/// page's buffer.
+/// * Pinned but not in memory: Pin() was called on the unpinned page, but
+/// WaitForBuffer() has not been called. It is invalid to access the page's
buffer.
+class Sorter::Page {
+ public:
+ Page() { Reset(); }
+
+ /// Create a new page of length 'sorter->page_len_' bytes using
+ /// 'sorter->buffer_pool_client_'. Caller must ensure the client has enough
+ /// reservation for the page.
+ Status Init(Sorter* sorter) WARN_UNUSED_RESULT {
+ const BufferPool::BufferHandle* page_buffer;
+ RETURN_IF_ERROR(pool()->CreatePage(sorter->buffer_pool_client_,
sorter->page_len_,
+ &handle_, &page_buffer));
+ data_ = page_buffer->data();
+ return Status::OK();
}
- blocks->clear();
-}
-static int NumNonNullBlocks(const vector<BufferedBlockMgr::Block*>& blocks) {
- int count = 0;
- for (BufferedBlockMgr::Block* block: blocks) {
- if (block != NULL) ++count;
+ /// Extract the buffer from the page. The page must be in memory. When this
function
+ /// returns the page is closed.
+ BufferPool::BufferHandle ExtractBuffer(BufferPool::ClientHandle* client) {
+ DCHECK(data_ != nullptr) << "Page must be in memory";
+ BufferPool::BufferHandle buffer;
+ Status status = pool()->ExtractBuffer(client, &handle_, &buffer);
+ DCHECK(status.ok()) << "Page was in memory, ExtractBuffer() shouldn't
fail";
+ Reset();
+ return buffer;
+ }
+
+ /// Allocate 'len' bytes in the current page. The page must be in memory,
and the
+ /// amount to allocate cannot exceed BytesRemaining().
+ uint8_t* AllocateBytes(int64_t len) {
+ DCHECK_GE(len, 0);
+ DCHECK_LE(len, BytesRemaining());
+ DCHECK(data_ != nullptr);
+ uint8_t* result = data_ + valid_data_len_;
+ valid_data_len_ += len;
+ return result;
+ }
+
+ /// Free the last 'len' bytes allocated from AllocateBytes(). The page must
be in
+ /// memory.
+ void FreeBytes(int64_t len) {
+ DCHECK_GE(len, 0);
+ DCHECK_LE(len, valid_data_len_);
+ DCHECK(data_ != nullptr);
+ valid_data_len_ -= len;
+ }
+
+ /// Return number of bytes remaining in page.
+ int64_t BytesRemaining() { return len() - valid_data_len_; }
+
+ /// Brings a pinned page into memory, if not already in memory, and sets
'data_' to
+ /// point to the page's buffer.
+ Status WaitForBuffer() WARN_UNUSED_RESULT {
+ DCHECK(handle_.is_pinned());
+ if (data_ != nullptr) return Status::OK();
+ const BufferPool::BufferHandle* page_buffer;
+ RETURN_IF_ERROR(handle_.GetBuffer(&page_buffer));
+ data_ = page_buffer->data();
+ return Status::OK();
+ }
+
+ /// Helper to pin the page. Caller must ensure the client has enough
reservation
+ /// remaining to pin the page. Only valid to call on an unpinned page.
+ Status Pin(BufferPool::ClientHandle* client) WARN_UNUSED_RESULT {
+ DCHECK(!handle_.is_pinned());
+ return pool()->Pin(client, &handle_);
+ }
+
+ /// Helper to unpin the page.
+ void Unpin(BufferPool::ClientHandle* client) {
+ pool()->Unpin(client, &handle_);
+ data_ = nullptr;
+ }
+
+ /// Destroy the page with 'client'.
+ void Close(BufferPool::ClientHandle* client) {
+ pool()->DestroyPage(client, &handle_);
+ Reset();
}
- return count;
-}
+
+ int64_t valid_data_len() const { return valid_data_len_; }
+ /// Returns a pointer to the start of the page's buffer. Only valid to call
if the
+ /// page is in memory.
+ uint8_t* data() const {
+ DCHECK(data_ != nullptr);
+ return data_;
+ }
+ int64_t len() const { return handle_.len(); }
+ bool is_open() const { return handle_.is_open(); }
+ bool is_pinned() const { return handle_.is_pinned(); }
+ std::string DebugString() const { return handle_.DebugString(); }
+
+ private:
+ /// Reset the page to an unitialized state. 'handle_' must already be closed.
+ void Reset() {
+ DCHECK(!handle_.is_open());
+ valid_data_len_ = 0;
+ data_ = nullptr;
+ }
+
+ /// Helper to get the singleton buffer pool.
+ static BufferPool* pool() { return ExecEnv::GetInstance()->buffer_pool(); }
+
+ BufferPool::PageHandle handle_;
+
+ /// Length of valid data written to the page.
+ int64_t valid_data_len_;
+
+ /// Cached pointer to the buffer in 'handle_'. NULL if the page is unpinned.
May be NULL
+ /// or not NULL if the page is pinned. Can be populated by calling
WaitForBuffer() on a
+ /// pinned page.
+ uint8_t* data_;
+};
/// A run is a sequence of tuples. The run can be sorted or unsorted (in which
case the
-/// Sorter will sort it). A run comprises a sequence of fixed-length blocks
containing the
+/// Sorter will sort it). A run comprises a sequence of fixed-length pages
containing the
/// tuples themselves (i.e. fixed-len slots that may contain ptrs to
var-length data), and
-/// an optional sequence of var-length blocks containing the var-length data.
+/// an optional sequence of var-length pages containing the var-length data.
///
/// Runs are either "initial runs" constructed from the sorter's input by
evaluating
/// the expressions in 'sort_tuple_exprs_' or "intermediate runs" constructed
@@ -84,7 +194,7 @@ static int NumNonNullBlocks(const
vector<BufferedBlockMgr::Block*>& blocks) {
/// sorted run.
///
/// The expected calling sequence of functions is as follows:
-/// * Init() to initialize the run and allocate initial blocks.
+/// * Init() to initialize the run and allocate initial pages.
/// * Add*Batch() to add batches of tuples to the run.
/// * FinalizeInput() to signal that no more batches will be added.
/// * If the run is unsorted, it must be sorted. After that set_sorted() must
be called.
@@ -92,29 +202,30 @@ static int NumNonNullBlocks(const
vector<BufferedBlockMgr::Block*>& blocks) {
/// * PrepareRead() to allocate resources for reading the run.
/// * GetNext() (if there was a single run) or GetNextBatch() (when merging
multiple runs)
/// to read from the run.
-/// * Once reading is done, DeleteAllBlocks() should be called to free
resources.
+/// * Once reading is done, CloseAllPages() should be called to free resources.
class Sorter::Run {
public:
Run(Sorter* parent, TupleDescriptor* sort_tuple_desc, bool initial_run);
~Run() {
- DCHECK(fixed_len_blocks_.empty());
- DCHECK(var_len_blocks_.empty());
- DCHECK(var_len_copy_block_ == NULL);
+ DCHECK(fixed_len_pages_.empty());
+ DCHECK(var_len_pages_.empty());
+ DCHECK(!var_len_copy_page_.is_open());
}
/// Initialize the run for input rows by allocating the minimum number of
required
- /// blocks - one block for fixed-len data added to fixed_len_blocks_, one
for the
- /// initially unsorted var-len data added to var_len_blocks_, and one to
copy sorted
- /// var-len data into var_len_copy_block_.
- Status Init();
+ /// pages - one page for fixed-len data added to fixed_len_pages_, one for
the
+ /// initially unsorted var-len data added to var_len_pages_, and one to copy
sorted
+ /// var-len data into var_len_copy_page_.
+ Status Init() WARN_UNUSED_RESULT;
/// Add the rows from 'batch' starting at 'start_index' to the current run.
Returns the
- /// number of rows actually added in 'num_processed'. If the run is full (no
more blocks
+ /// number of rows actually added in 'num_processed'. If the run is full (no
more pages
/// can be allocated), 'num_processed' may be less than the number of
remaining rows in
/// the batch. AddInputBatch() materializes the input rows using the
expressions in
/// sorter_->sort_tuple_expr_evals_, while AddIntermediateBatch() just
copies rows.
- Status AddInputBatch(RowBatch* batch, int start_index, int* num_processed) {
+ Status AddInputBatch(
+ RowBatch* batch, int start_index, int* num_processed) WARN_UNUSED_RESULT
{
DCHECK(initial_run_);
if (has_var_len_slots_) {
return AddBatchInternal<true, true>(batch, start_index, num_processed);
@@ -122,7 +233,9 @@ class Sorter::Run {
return AddBatchInternal<false, true>(batch, start_index, num_processed);
}
}
- Status AddIntermediateBatch(RowBatch* batch, int start_index, int*
num_processed) {
+
+ Status AddIntermediateBatch(
+ RowBatch* batch, int start_index, int* num_processed) WARN_UNUSED_RESULT
{
DCHECK(!initial_run_);
if (has_var_len_slots_) {
return AddBatchInternal<true, false>(batch, start_index, num_processed);
@@ -133,53 +246,53 @@ class Sorter::Run {
/// Called after the final call to Add*Batch() to do any bookkeeping
necessary to
/// finalize the run. Must be called before sorting or merging the run.
- Status FinalizeInput();
+ Status FinalizeInput() WARN_UNUSED_RESULT;
- /// Unpins all the blocks in a sorted run. Var-length column data is copied
into new
- /// blocks in sorted order. Pointers in the original tuples are converted to
offsets
- /// from the beginning of the sequence of var-len data blocks. Returns an
error and
- /// may leave some blocks pinned if an error is encountered in the block mgr.
- Status UnpinAllBlocks();
+ /// Unpins all the pages in a sorted run. Var-length column data is copied
into new
+ /// pages in sorted order. Pointers in the original tuples are converted to
offsets
+ /// from the beginning of the sequence of var-len data pages. Returns an
error and
+ /// may leave some pages pinned if an error is encountered.
+ Status UnpinAllPages() WARN_UNUSED_RESULT;
- /// Deletes all blocks.
- void DeleteAllBlocks();
+ /// Closes all pages and clears vectors of pages.
+ void CloseAllPages();
- /// Prepare to read a sorted run. Pins the first block(s) in the run if the
run was
+ /// Prepare to read a sorted run. Pins the first page(s) in the run if the
run was
/// previously unpinned. If the run was unpinned, try to pin the initial
fixed and
- /// var len blocks in the run. If it couldn't pin them, set
pinned_all_blocks to false.
- /// In that case, none or one of the initial blocks may be pinned and it is
valid to
- /// call PrepareRead() again to retry pinning the remainder.
pinned_all_blocks is
- /// always set to true if the run is pinned.
- Status PrepareRead(bool* pinned_all_blocks);
+ /// var len pages in the run. If it couldn't pin them, set pinned to false.
+ /// In that case, none of the initial pages will be pinned and it is valid to
+ /// call PrepareRead() again to retry pinning. pinned is always set to
+ /// true if the run was pinned.
+ Status PrepareRead(bool* pinned) WARN_UNUSED_RESULT;
/// Interface for merger - get the next batch of rows from this run. This
run still
/// owns the returned batch. Calls GetNext(RowBatch*, bool*).
- Status GetNextBatch(RowBatch** sorted_batch);
+ Status GetNextBatch(RowBatch** sorted_batch) WARN_UNUSED_RESULT;
/// Fill output_batch with rows from this run. If CONVERT_OFFSET_TO_PTR is
true, offsets
/// in var-length slots are converted back to pointers. Only row pointers
are copied
/// into output_batch. eos is set to true after all rows from the run are
returned.
- /// If eos is true, the returned output_batch has zero rows and has no
attached blocks.
- /// If this run was unpinned, one block (two if there are var-len slots) is
pinned while
- /// rows are filled into output_batch. The block is unpinned before the next
block is
- /// pinned, so at most one (two if there are var-len slots) block(s) will be
pinned at
- /// once. If the run was pinned, the blocks are not unpinned and each block
is attached
- /// to 'output_batch' once all rows referencing data in the block have been
returned,
+ /// If eos is true, the returned output_batch has zero rows and has no
attached pages.
+ /// If this run was unpinned, one page (two if there are var-len slots) is
pinned while
+ /// rows are filled into output_batch. The page is unpinned before the next
page is
+ /// pinned, so at most one (two if there are var-len slots) page(s) will be
pinned at
+ /// once. If the run was pinned, the pages are not unpinned and each page is
attached
+ /// to 'output_batch' once all rows referencing data in the page have been
returned,
/// either in the current batch or previous batches. In both pinned and
unpinned cases,
- /// all rows in output_batch will reference at most one fixed-len and one
var-len block.
+ /// all rows in output_batch will reference at most one fixed-len and one
var-len page.
template <bool CONVERT_OFFSET_TO_PTR>
- Status GetNext(RowBatch* output_batch, bool* eos);
+ Status GetNext(RowBatch* output_batch, bool* eos) WARN_UNUSED_RESULT;
- /// Delete all blocks in 'runs' and clear 'runs'.
+ /// Delete all pages in 'runs' and clear 'runs'.
static void CleanupRuns(deque<Run*>* runs) {
- for (Run* run: *runs) {
- run->DeleteAllBlocks();
+ for (Run* run : *runs) {
+ run->CloseAllPages();
}
runs->clear();
}
- /// Return total amount of fixed and var len data in run, not including
blocks that
- /// were already transferred.
+ /// Return total amount of fixed and var len data in run, not including
pages that
+ /// were already transferred or closed.
int64_t TotalBytes() const;
inline bool is_pinned() const { return is_pinned_; }
@@ -196,34 +309,42 @@ class Sorter::Run {
/// INITIAL_RUN and HAS_VAR_LEN_SLOTS are template arguments for performance
and must
/// match 'initial_run_' and 'has_var_len_slots_'.
template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN>
- Status AddBatchInternal(RowBatch* batch, int start_index, int*
num_processed);
+ Status AddBatchInternal(
+ RowBatch* batch, int start_index, int* num_processed) WARN_UNUSED_RESULT;
- /// Finalize the list of blocks: delete empty final blocks and unpin the
previous block
+ /// Finalize the list of pages: delete empty final pages and unpin the
previous page
/// if the run is unpinned.
- Status FinalizeBlocks(vector<BufferedBlockMgr::Block*>* blocks);
+ Status FinalizePages(vector<Page>* pages) WARN_UNUSED_RESULT;
/// Collect the non-null var-len (e.g. STRING) slots from 'src' in
'var_len_values' and
/// return the total length of all var-len values in 'total_var_len'.
- void CollectNonNullVarSlots(Tuple* src, vector<StringValue*>* var_len_values,
- int* total_var_len);
+ void CollectNonNullVarSlots(
+ Tuple* src, vector<StringValue*>* var_len_values, int* total_var_len);
- enum AddBlockMode { KEEP_PREV_PINNED, UNPIN_PREV };
+ enum AddPageMode { KEEP_PREV_PINNED, UNPIN_PREV };
- /// Try to extend the current run by a block. If 'mode' is KEEP_PREV_PINNED,
try to
- /// allocate a new block, which may fail to extend the run due to lack of
memory. If
- /// mode is 'UNPIN_PREV', unpin the previous block in block_sequence before
allocating
- /// and adding a new block - this never fails due to lack of memory.
+ /// Try to extend the current run by a page. If 'mode' is KEEP_PREV_PINNED,
try to
+ /// allocate a new page, which may fail to extend the run due to lack of
memory. If
+ /// mode is 'UNPIN_PREV', unpin the previous page in page_sequence before
allocating
+ /// and adding a new page - this never fails due to lack of memory.
///
- /// Returns an error status only if the block manager returns an error. If
no error is
+ /// Returns an error status only if the buffer pool returns an error. If no
error is
/// encountered, sets 'added' to indicate whether the run was extended and
returns
- /// Status::OK(). The new block is appended to 'block_sequence'.
- Status TryAddBlock(AddBlockMode mode, vector<BufferedBlockMgr::Block*>*
block_sequence,
- bool* added);
+ /// Status::OK(). The new page is appended to 'page_sequence'.
+ Status TryAddPage(
+ AddPageMode mode, vector<Page>* page_sequence, bool* added)
WARN_UNUSED_RESULT;
+
+ /// Adds a new page to 'page_sequence' by a page. Caller must ensure enough
+ /// reservation is available to create the page.
+ ///
+ /// Returns an error status only if the buffer pool returns an error. If an
error
+ /// is returned 'page_sequence' is left unmodified.
+ Status AddPage(vector<Page>* page_sequence) WARN_UNUSED_RESULT;
- /// Advance to the next read block. If the run is pinned, has no effect. If
the run
- /// is unpinned, atomically pin the block at 'block_index' + 1 in 'blocks'
and delete
- /// the block at 'block_index'.
- Status PinNextReadBlock(vector<BufferedBlockMgr::Block*>* blocks, int
block_index);
+ /// Advance to the next read page. If the run is pinned, has no effect. If
the run
+ /// is unpinned, atomically pin the page at 'page_index' + 1 in 'pages' and
delete
+ /// the page at 'page_index'.
+ Status PinNextReadPage(vector<Page>* pages, int page_index)
WARN_UNUSED_RESULT;
/// Copy the StringValues in 'var_values' to 'dest' in order and update the
StringValue
/// ptrs in 'dest' to point to the copied data.
@@ -231,25 +352,41 @@ class Sorter::Run {
/// Copy the StringValues in 'var_values' to 'dest' in order. Update the
StringValue
/// ptrs in 'dest' to contain a packed offset for the copied data comprising
- /// block_index and the offset relative to block_start.
- void CopyVarLenDataConvertOffset(const vector<StringValue*>& var_values,
- int block_index, const uint8_t* block_start, uint8_t* dest);
+ /// page_index and the offset relative to page_start.
+ void CopyVarLenDataConvertOffset(const vector<StringValue*>& var_values, int
page_index,
+ const uint8_t* page_start, uint8_t* dest);
/// Convert encoded offsets to valid pointers in tuple with layout
'sort_tuple_desc_'.
- /// 'tuple' is modified in-place. Returns true if the pointers refer to the
block at
- /// 'var_len_blocks_index_' and were successfully converted or false if the
var len
- /// data is in the next block, in which case 'tuple' is unmodified.
+ /// 'tuple' is modified in-place. Returns true if the pointers refer to the
page at
+ /// 'var_len_pages_index_' and were successfully converted or false if the
var len
+ /// data is in the next page, in which case 'tuple' is unmodified.
bool ConvertOffsetsToPtrs(Tuple* tuple);
- /// Returns true if we have var-len blocks in the run.
- inline bool HasVarLenBlocks() const {
- // Shouldn't have any blocks unless there are slots.
- DCHECK(var_len_blocks_.empty() || has_var_len_slots_);
- return !var_len_blocks_.empty();
+ /// Returns true if we have var-len pages in the run.
+ inline bool HasVarLenPages() const {
+ // Shouldn't have any pages unless there are slots.
+ DCHECK(var_len_pages_.empty() || has_var_len_slots_);
+ return !var_len_pages_.empty();
+ }
+
+ static int NumOpenPages(const vector<Page>& pages) {
+ int count = 0;
+ for (const Page& page : pages) {
+ if (page.is_open()) ++count;
+ }
+ return count;
+ }
+
+ /// Close all open pages and clear vector.
+ void DeleteAndClearPages(vector<Page>* pages) {
+ for (Page& page : *pages) {
+ if (page.is_open()) page.Close(sorter_->buffer_pool_client_);
+ }
+ pages->clear();
}
/// Parent sorter object.
- const Sorter* sorter_;
+ Sorter* const sorter_;
/// Materialized sort tuple. Input rows are materialized into 1 tuple (with
descriptor
/// sort_tuple_desc_) before sorting.
@@ -258,10 +395,10 @@ class Sorter::Run {
/// The size in bytes of the sort tuple.
const int sort_tuple_size_;
- /// Number of tuples per block in a run. This gets multiplied with
- /// TupleIterator::block_index_ in various places and to make sure we don't
overflow the
+ /// Number of tuples per page in a run. This gets multiplied with
+ /// TupleIterator::page_index_ in various places and to make sure we don't
overflow the
/// result of that operation we make this int64_t here.
- const int64_t block_capacity_;
+ const int64_t page_capacity_;
const bool has_var_len_slots_;
@@ -269,7 +406,7 @@ class Sorter::Run {
/// resulting from merging other runs.
const bool initial_run_;
- /// True if all blocks in the run are pinned. Initial runs start off pinned
and
+ /// True if all pages in the run are pinned. Initial runs start off pinned
and
/// can be unpinned. Intermediate runs are always unpinned.
bool is_pinned_;
@@ -281,27 +418,27 @@ class Sorter::Run {
/// Always true for intermediate runs.
bool is_sorted_;
- /// Sequence of blocks in this run containing the fixed-length portion of
the sort
+ /// Sequence of pages in this run containing the fixed-length portion of the
sort
/// tuples comprising this run. The data pointed to by the var-len slots are
in
- /// var_len_blocks_. A run can have zero blocks if no rows are appended.
- /// If the run is sorted, the tuples in fixed_len_blocks_ will be in sorted
order.
- /// fixed_len_blocks_[i] is NULL iff it has been transferred or deleted.
- vector<BufferedBlockMgr::Block*> fixed_len_blocks_;
+ /// var_len_pages_. A run can have zero pages if no rows are appended.
+ /// If the run is sorted, the tuples in fixed_len_pages_ will be in sorted
order.
+ /// fixed_len_pages_[i] is closed iff it has been transferred or deleted.
+ vector<Page> fixed_len_pages_;
- /// Sequence of blocks in this run containing the var-length data
corresponding to the
- /// var-length columns from fixed_len_blocks_. In intermediate runs, the
var-len data is
+ /// Sequence of pages in this run containing the var-length data
corresponding to the
+ /// var-length columns from fixed_len_pages_. In intermediate runs, the
var-len data is
/// always stored in the same order as the fixed-length tuples. In initial
runs, the
/// var-len data is initially in unsorted order, but is reshuffled into
sorted order in
- /// UnpinAllBlocks(). A run can have no var len blocks if there are no var
len slots or
+ /// UnpinAllPages(). A run can have no var len pages if there are no var len
slots or
/// if all the var len data is empty or NULL.
- /// var_len_blocks_[i] is NULL iff it has been transferred or deleted.
- vector<BufferedBlockMgr::Block*> var_len_blocks_;
+ /// var_len_pages_[i] is closed iff it has been transferred or deleted.
+ vector<Page> var_len_pages_;
- /// For initial unsorted runs, an extra pinned block is needed to reorder
var-len data
- /// into fixed order in UnpinAllBlocks(). 'var_len_copy_block_' stores this
extra
- /// block. Deleted in UnpinAllBlocks().
+ /// For initial unsorted runs, an extra pinned page is needed to reorder
var-len data
+ /// into fixed order in UnpinAllPages(). 'var_len_copy_page_' stores this
extra
+ /// page. Deleted in UnpinAllPages().
/// TODO: in case of in-memory runs, this could be deleted earlier to free
up memory.
- BufferedBlockMgr::Block* var_len_copy_block_;
+ Page var_len_copy_page_;
/// Number of tuples added so far to this run.
int64_t num_tuples_;
@@ -313,18 +450,18 @@ class Sorter::Run {
scoped_ptr<RowBatch> buffered_batch_;
/// Members used when a run is read in GetNext().
- /// The index into 'fixed_' and 'var_len_blocks_' of the blocks being read
in GetNext().
- int fixed_len_blocks_index_;
- int var_len_blocks_index_;
+ /// The index into 'fixed_' and 'var_len_pages_' of the pages being read in
GetNext().
+ int fixed_len_pages_index_;
+ int var_len_pages_index_;
/// If true, the last call to GetNext() reached the end of the previous
fixed or
- /// var-len block. The next call to GetNext() must increment
'fixed_len_blocks_index_'
- /// or 'var_len_blocks_index_'. It must also pin the next block if the run
is unpinned.
- bool end_of_fixed_len_block_;
- bool end_of_var_len_block_;
+ /// var-len page. The next call to GetNext() must increment
'fixed_len_pages_index_'
+ /// or 'var_len_pages_index_'. It must also pin the next page if the run is
unpinned.
+ bool end_of_fixed_len_page_;
+ bool end_of_var_len_page_;
- /// Offset into the current fixed length data block being processed.
- int fixed_len_block_offset_;
+ /// Offset into the current fixed length data page being processed.
+ int fixed_len_page_offset_;
};
/// Helper class used to iterate over tuples in a run during sorting.
@@ -340,7 +477,7 @@ class Sorter::TupleIterator {
/// Default constructor used for local variable. Produces invalid iterator
that must
/// be assigned before use.
TupleIterator() : index_(-1), tuple_(NULL), buffer_start_index_(-1),
- buffer_end_index_(-1), block_index_(-1) { }
+ buffer_end_index_(-1), page_index_(-1) { }
/// Create an iterator pointing to the first tuple in the run.
static inline TupleIterator Begin(Sorter::Run* run) { return
TupleIterator(run, 0); }
@@ -351,8 +488,8 @@ class Sorter::TupleIterator {
}
/// Increments 'index_' and sets 'tuple_' to point to the next tuple in the
run.
- /// Increments 'block_index_' and advances to the next block if the next
tuple is in
- /// the next block. Can be advanced one past the last tuple in the run, but
is not
+ /// Increments 'page_index_' and advances to the next page if the next tuple
is in
+ /// the next page. Can be advanced one past the last tuple in the run, but
is not
/// valid to dereference 'tuple_' in that case. 'run' and 'tuple_size' are
passed as
/// arguments to avoid redundantly storing the same values in multiple
iterators in
/// perf-critical algorithms.
@@ -370,13 +507,13 @@ class Sorter::TupleIterator {
}
private:
- // Move to the next block in the run (or do nothing if at end of run).
+ // Move to the next page in the run (or do nothing if at end of run).
// This is the slow path for Next();
- void NextBlock(Sorter::Run* run, int tuple_size);
+ void NextPage(Sorter::Run* run, int tuple_size);
- // Move to the previous block in the run (or do nothing if at beginning of
run).
+ // Move to the previous page in the run (or do nothing if at beginning of
run).
// This is the slow path for Prev();
- void PrevBlock(Sorter::Run* run, int tuple_size);
+ void PrevPage(Sorter::Run* run, int tuple_size);
/// Index of the current tuple in the run.
/// Can be -1 or run->num_rows() if Next() or Prev() moves iterator outside
of run.
@@ -387,15 +524,15 @@ class Sorter::TupleIterator {
/// iterator outside of run.
uint8_t* tuple_;
- /// Indices of start and end tuples of block at block_index_. I.e. the
current block
+ /// Indices of start and end tuples of page at page_index_. I.e. the current
page
/// has tuples with indices in range [buffer_start_index_, buffer_end_index).
int64_t buffer_start_index_;
int64_t buffer_end_index_;
- /// Index into fixed_len_blocks_ of the block containing the current tuple.
- /// If index_ is negative or past end of run, will point to the first or
last block
+ /// Index into fixed_len_pages_ of the page containing the current tuple.
+ /// If index_ is negative or past end of run, will point to the first or
last page
/// in run respectively.
- int block_index_;
+ int page_index_;
};
/// Sorts a sequence of tuples from a run in place using a provided tuple
comparator.
@@ -404,16 +541,16 @@ class Sorter::TupleIterator {
/// instance to check for cancellation during an in-memory sort.
class Sorter::TupleSorter {
public:
- TupleSorter(const TupleRowComparator& comparator, int64_t block_size,
- int tuple_size, RuntimeState* state);
+ TupleSorter(const TupleRowComparator& comparator, int64_t page_size, int
tuple_size,
+ RuntimeState* state);
~TupleSorter();
/// Performs a quicksort for tuples in 'run' followed by an insertion sort to
- /// finish smaller blocks. Only valid to call if this is an initial run that
has not
+ /// finish smaller ranges. Only valid to call if this is an initial run that
has not
/// yet been sorted. Returns an error status if any error is encountered or
if the
/// query is cancelled.
- Status Sort(Run* run);
+ Status Sort(Run* run) WARN_UNUSED_RESULT;
private:
static const int INSERTION_THRESHOLD = 16;
@@ -451,7 +588,8 @@ class Sorter::TupleSorter {
/// Perform an insertion sort for rows in the range [begin, end) in a run.
/// Only valid to call for ranges of size at least 1.
- Status InsertionSort(const TupleIterator& begin, const TupleIterator& end);
+ Status InsertionSort(
+ const TupleIterator& begin, const TupleIterator& end) WARN_UNUSED_RESULT;
/// Partitions the sequence of tuples in the range [begin, end) in a run
into two
/// groups around the pivot tuple - i.e. tuples in first group are <= the
pivot, and
@@ -459,12 +597,12 @@ class Sorter::TupleSorter {
/// groups and the index to the first element in the second group is
returned in 'cut'.
/// Return an error status if any error is encountered or if the query is
cancelled.
Status Partition(TupleIterator begin, TupleIterator end, const Tuple* pivot,
- TupleIterator* cut);
+ TupleIterator* cut) WARN_UNUSED_RESULT;
/// Performs a quicksort of rows in the range [begin, end) followed by
insertion sort
/// for smaller groups of elements. Return an error status for any errors or
if the
/// query is cancelled.
- Status SortHelper(TupleIterator begin, TupleIterator end);
+ Status SortHelper(TupleIterator begin, TupleIterator end) WARN_UNUSED_RESULT;
/// Select a pivot to partition [begin, end).
Tuple* SelectPivot(TupleIterator begin, TupleIterator end);
@@ -477,45 +615,33 @@ class Sorter::TupleSorter {
};
// Sorter::Run methods
-Sorter::Run::Run(Sorter* parent, TupleDescriptor* sort_tuple_desc,
- bool initial_run)
+Sorter::Run::Run(Sorter* parent, TupleDescriptor* sort_tuple_desc, bool
initial_run)
: sorter_(parent),
sort_tuple_desc_(sort_tuple_desc),
sort_tuple_size_(sort_tuple_desc->byte_size()),
- block_capacity_(parent->block_mgr_->max_block_size() / sort_tuple_size_),
+ page_capacity_(parent->page_len_ / sort_tuple_size_),
has_var_len_slots_(sort_tuple_desc->HasVarlenSlots()),
initial_run_(initial_run),
is_pinned_(initial_run),
is_finalized_(false),
is_sorted_(!initial_run),
- var_len_copy_block_(NULL),
- num_tuples_(0) { }
+ num_tuples_(0) {}
Status Sorter::Run::Init() {
- BufferedBlockMgr::Block* block = NULL;
- RETURN_IF_ERROR(
- sorter_->block_mgr_->GetNewBlock(sorter_->block_mgr_client_, NULL,
&block));
- if (block == NULL) {
- return sorter_->mem_tracker_->MemLimitExceeded(
- sorter_->state_, Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "fixed"));
- }
- fixed_len_blocks_.push_back(block);
+ int num_to_create = 1 + has_var_len_slots_ + (has_var_len_slots_ &&
initial_run_);
+ int64_t required_mem = num_to_create * sorter_->page_len_;
+ if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(required_mem)) {
+ return Status(Substitute(
+ "Unexpected error trying to reserve $0 bytes for a sorted run: $2",
+ required_mem, sorter_->buffer_pool_client_->DebugString()));
+ }
+
+ RETURN_IF_ERROR(AddPage(&fixed_len_pages_));
if (has_var_len_slots_) {
- RETURN_IF_ERROR(
- sorter_->block_mgr_->GetNewBlock(sorter_->block_mgr_client_, NULL,
&block));
- if (block == NULL) {
- return sorter_->mem_tracker_->MemLimitExceeded(
- sorter_->state_, Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "variable"));
- }
- var_len_blocks_.push_back(block);
+ RETURN_IF_ERROR(AddPage(&var_len_pages_));
if (initial_run_) {
- // Need additional var len block to reorder var len data in
UnpinAllBlocks().
- RETURN_IF_ERROR(sorter_->block_mgr_->GetNewBlock(
- sorter_->block_mgr_client_, NULL, &var_len_copy_block_));
- if (var_len_copy_block_ == NULL) {
- return sorter_->mem_tracker_->MemLimitExceeded(
- sorter_->state_, Substitute(MEM_ALLOC_FAILED_ERROR_MSG,
"variable"));
- }
+ // Need additional var len page to reorder var len data in
UnpinAllPages().
+ RETURN_IF_ERROR(var_len_copy_page_.Init(sorter_));
}
}
if (initial_run_) {
@@ -527,14 +653,15 @@ Status Sorter::Run::Init() {
}
template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN>
-Status Sorter::Run::AddBatchInternal(RowBatch* batch, int start_index, int*
num_processed) {
+Status Sorter::Run::AddBatchInternal(
+ RowBatch* batch, int start_index, int* num_processed) {
DCHECK(!is_finalized_);
- DCHECK(!fixed_len_blocks_.empty());
+ DCHECK(!fixed_len_pages_.empty());
DCHECK_EQ(HAS_VAR_LEN_SLOTS, has_var_len_slots_);
DCHECK_EQ(INITIAL_RUN, initial_run_);
*num_processed = 0;
- BufferedBlockMgr::Block* cur_fixed_len_block = fixed_len_blocks_.back();
+ Page* cur_fixed_len_page = &fixed_len_pages_.back();
if (!INITIAL_RUN) {
// For intermediate merges, the input row is the sort tuple.
@@ -543,13 +670,13 @@ Status Sorter::Run::AddBatchInternal(RowBatch* batch, int
start_index, int* num_
}
/// Keep initial unsorted runs pinned in memory so we can sort them.
- const AddBlockMode add_mode = INITIAL_RUN ? KEEP_PREV_PINNED : UNPIN_PREV;
+ const AddPageMode add_mode = INITIAL_RUN ? KEEP_PREV_PINNED : UNPIN_PREV;
- // Input rows are copied/materialized into tuples allocated in
fixed_len_blocks_.
- // The variable length column data are copied into blocks stored in
var_len_blocks_.
+ // Input rows are copied/materialized into tuples allocated in
fixed_len_pages_.
+ // The variable length column data are copied into pages stored in
var_len_pages_.
// Input row processing is split into two loops.
- // The inner loop processes as many input rows as will fit in
cur_fixed_len_block.
- // The outer loop allocates a new block for fixed-len data if the input
batch is
+ // The inner loop processes as many input rows as will fit in
cur_fixed_len_page.
+ // The outer loop allocates a new page for fixed-len data if the input batch
is
// not exhausted.
// cur_input_index is the index into the input 'batch' of the current input
row being
@@ -559,22 +686,23 @@ Status Sorter::Run::AddBatchInternal(RowBatch* batch, int
start_index, int* num_
string_values.reserve(sort_tuple_desc_->string_slots().size());
while (cur_input_index < batch->num_rows()) {
// tuples_remaining is the number of tuples to copy/materialize into
- // cur_fixed_len_block.
- int tuples_remaining = cur_fixed_len_block->BytesRemaining() /
sort_tuple_size_;
+ // cur_fixed_len_page.
+ int tuples_remaining = cur_fixed_len_page->BytesRemaining() /
sort_tuple_size_;
tuples_remaining = min(batch->num_rows() - cur_input_index,
tuples_remaining);
for (int i = 0; i < tuples_remaining; ++i) {
int total_var_len = 0;
TupleRow* input_row = batch->GetRow(cur_input_index);
- Tuple* new_tuple =
cur_fixed_len_block->Allocate<Tuple>(sort_tuple_size_);
+ Tuple* new_tuple =
+
reinterpret_cast<Tuple*>(cur_fixed_len_page->AllocateBytes(sort_tuple_size_));
if (INITIAL_RUN) {
new_tuple->MaterializeExprs<HAS_VAR_LEN_SLOTS, true>(input_row,
*sort_tuple_desc_, sorter_->sort_tuple_expr_evals_, NULL,
&string_values, &total_var_len);
- if (total_var_len > sorter_->block_mgr_->max_block_size()) {
- return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, Substitute(
- "Variable length data in a single tuple larger than block size
$0 > $1",
- total_var_len, sorter_->block_mgr_->max_block_size())));
+ if (total_var_len > sorter_->page_len_) {
+ return Status(TErrorCode::MAX_ROW_SIZE,
+ PrettyPrinter::Print(total_var_len, TUnit::BYTES),
sorter_->node_id_,
+ PrettyPrinter::Print(0, TUnit::BYTES));
}
} else {
memcpy(new_tuple, input_row->GetTuple(0), sort_tuple_size_);
@@ -584,17 +712,17 @@ Status Sorter::Run::AddBatchInternal(RowBatch* batch, int
start_index, int* num_
}
if (HAS_VAR_LEN_SLOTS) {
- DCHECK_GT(var_len_blocks_.size(), 0);
- BufferedBlockMgr::Block* cur_var_len_block = var_len_blocks_.back();
- if (cur_var_len_block->BytesRemaining() < total_var_len) {
+ DCHECK_GT(var_len_pages_.size(), 0);
+ Page* cur_var_len_page = &var_len_pages_.back();
+ if (cur_var_len_page->BytesRemaining() < total_var_len) {
bool added;
- RETURN_IF_ERROR(TryAddBlock(add_mode, &var_len_blocks_, &added));
+ RETURN_IF_ERROR(TryAddPage(add_mode, &var_len_pages_, &added));
if (added) {
- cur_var_len_block = var_len_blocks_.back();
+ cur_var_len_page = &var_len_pages_.back();
} else {
- // There was not enough space in the last var-len block for this
tuple, and
+ // There was not enough space in the last var-len page for this
tuple, and
// the run could not be extended. Return the fixed-len allocation
and exit.
- cur_fixed_len_block->ReturnAllocation(sort_tuple_size_);
+ cur_fixed_len_page->FreeBytes(sort_tuple_size_);
return Status::OK();
}
}
@@ -605,13 +733,13 @@ Status Sorter::Run::AddBatchInternal(RowBatch* batch, int
start_index, int* num_
DCHECK(new_tuple->IsNull(coll_slot->null_indicator_offset()));
}
- uint8_t* var_data_ptr =
cur_var_len_block->Allocate<uint8_t>(total_var_len);
+ uint8_t* var_data_ptr = cur_var_len_page->AllocateBytes(total_var_len);
if (INITIAL_RUN) {
CopyVarLenData(string_values, var_data_ptr);
} else {
- DCHECK_EQ(var_len_blocks_.back(), cur_var_len_block);
- CopyVarLenDataConvertOffset(string_values, var_len_blocks_.size() -
1,
- reinterpret_cast<uint8_t*>(cur_var_len_block->buffer()),
var_data_ptr);
+ DCHECK_EQ(&var_len_pages_.back(), cur_var_len_page);
+ CopyVarLenDataConvertOffset(string_values, var_len_pages_.size() - 1,
+ cur_var_len_page->data(), var_data_ptr);
}
}
++num_tuples_;
@@ -619,13 +747,13 @@ Status Sorter::Run::AddBatchInternal(RowBatch* batch, int
start_index, int* num_
++cur_input_index;
}
- // If there are still rows left to process, get a new block for the
fixed-length
+ // If there are still rows left to process, get a new page for the
fixed-length
// tuples. If the run is already too long, return.
if (cur_input_index < batch->num_rows()) {
bool added;
- RETURN_IF_ERROR(TryAddBlock(add_mode, &fixed_len_blocks_, &added));
+ RETURN_IF_ERROR(TryAddPage(add_mode, &fixed_len_pages_, &added));
if (!added) return Status::OK();
- cur_fixed_len_block = fixed_len_blocks_.back();
+ cur_fixed_len_page = &fixed_len_pages_.back();
}
}
return Status::OK();
@@ -634,158 +762,146 @@ Status Sorter::Run::AddBatchInternal(RowBatch* batch,
int start_index, int* num_
Status Sorter::Run::FinalizeInput() {
DCHECK(!is_finalized_);
- RETURN_IF_ERROR(FinalizeBlocks(&fixed_len_blocks_));
+ RETURN_IF_ERROR(FinalizePages(&fixed_len_pages_));
if (has_var_len_slots_) {
- RETURN_IF_ERROR(FinalizeBlocks(&var_len_blocks_));
+ RETURN_IF_ERROR(FinalizePages(&var_len_pages_));
}
is_finalized_ = true;
return Status::OK();
}
-Status Sorter::Run::FinalizeBlocks(vector<BufferedBlockMgr::Block*>* blocks) {
- DCHECK_GT(blocks->size(), 0);
- BufferedBlockMgr::Block* last_block = blocks->back();
- if (last_block->valid_data_len() > 0) {
+Status Sorter::Run::FinalizePages(vector<Page>* pages) {
+ DCHECK_GT(pages->size(), 0);
+ Page* last_page = &pages->back();
+ if (last_page->valid_data_len() > 0) {
DCHECK_EQ(initial_run_, is_pinned_);
if (!is_pinned_) {
- // Unpin the last block of this unpinned run. We've finished writing the
run so
- // all blocks in the run can now be unpinned.
- RETURN_IF_ERROR(last_block->Unpin());
+ // Unpin the last page of this unpinned run. We've finished writing the
run so
+ // all pages in the run can now be unpinned.
+ last_page->Unpin(sorter_->buffer_pool_client_);
}
} else {
- last_block->Delete();
- blocks->pop_back();
+ last_page->Close(sorter_->buffer_pool_client_);
+ pages->pop_back();
}
return Status::OK();
}
-void Sorter::Run::DeleteAllBlocks() {
- DeleteAndClearBlocks(&fixed_len_blocks_);
- DeleteAndClearBlocks(&var_len_blocks_);
- if (var_len_copy_block_ != NULL) var_len_copy_block_->Delete();
- var_len_copy_block_ = NULL;
+void Sorter::Run::CloseAllPages() {
+ DeleteAndClearPages(&fixed_len_pages_);
+ DeleteAndClearPages(&var_len_pages_);
+ if (var_len_copy_page_.is_open()) {
+ var_len_copy_page_.Close(sorter_->buffer_pool_client_);
+ }
}
-Status Sorter::Run::UnpinAllBlocks() {
+Status Sorter::Run::UnpinAllPages() {
DCHECK(is_sorted_);
DCHECK(initial_run_);
DCHECK(is_pinned_);
DCHECK(is_finalized_);
- // A list of var len blocks to replace 'var_len_blocks_'. Note that after we
are done
- // we may have a different number of blocks, because internal fragmentation
may leave
- // slightly different amounts of wasted space at the end of each block.
- // We need to be careful to clean up these blocks if we run into an error in
this method.
- vector<BufferedBlockMgr::Block*> sorted_var_len_blocks;
- sorted_var_len_blocks.reserve(var_len_blocks_.size());
+ // A list of var len pages to replace 'var_len_pages_'. Note that after we
are done
+ // we may have a different number of pages, because internal fragmentation
may leave
+ // slightly different amounts of wasted space at the end of each page.
+ // We need to be careful to clean up these pages if we run into an error in
this method.
+ vector<Page> sorted_var_len_pages;
+ sorted_var_len_pages.reserve(var_len_pages_.size());
vector<StringValue*> string_values;
int total_var_len;
string_values.reserve(sort_tuple_desc_->string_slots().size());
- BufferedBlockMgr::Block* cur_sorted_var_len_block = NULL;
- if (HasVarLenBlocks()) {
- DCHECK(var_len_copy_block_ != NULL);
- sorted_var_len_blocks.push_back(var_len_copy_block_);
- cur_sorted_var_len_block = var_len_copy_block_;
- // Set var_len_copy_block_ to NULL since it was moved to var_len_blocks_.
- var_len_copy_block_ = NULL;
+ Page* cur_sorted_var_len_page = NULL;
+ if (HasVarLenPages()) {
+ DCHECK(var_len_copy_page_.is_open());
+ sorted_var_len_pages.push_back(move(var_len_copy_page_));
+ cur_sorted_var_len_page = &sorted_var_len_pages.back();
} else if (has_var_len_slots_) {
- // If we don't have any var-len blocks, clean up the copy block.
- DCHECK(var_len_copy_block_ != NULL);
- var_len_copy_block_->Delete();
- var_len_copy_block_ = NULL;
+ // If we don't have any var-len pages, clean up the copy page.
+ DCHECK(var_len_copy_page_.is_open());
+ var_len_copy_page_.Close(sorter_->buffer_pool_client_);
} else {
- DCHECK(var_len_copy_block_ == NULL);
+ DCHECK(!var_len_copy_page_.is_open());
}
Status status;
- for (int i = 0; i < fixed_len_blocks_.size(); ++i) {
- BufferedBlockMgr::Block* cur_fixed_block = fixed_len_blocks_[i];
+ for (int i = 0; i < fixed_len_pages_.size(); ++i) {
+ Page* cur_fixed_page = &fixed_len_pages_[i];
// Skip converting the pointers if no var-len slots, or if all the values
are null
// or zero-length. This will possibly leave zero-length pointers pointing
to
// arbitrary memory, but zero-length data cannot be dereferenced anyway.
- if (HasVarLenBlocks()) {
- for (int block_offset = 0; block_offset <
cur_fixed_block->valid_data_len();
- block_offset += sort_tuple_size_) {
- Tuple* cur_tuple =
- reinterpret_cast<Tuple*>(cur_fixed_block->buffer() + block_offset);
+ if (HasVarLenPages()) {
+ for (int page_offset = 0; page_offset < cur_fixed_page->valid_data_len();
+ page_offset += sort_tuple_size_) {
+ Tuple* cur_tuple = reinterpret_cast<Tuple*>(cur_fixed_page->data() +
page_offset);
CollectNonNullVarSlots(cur_tuple, &string_values, &total_var_len);
- DCHECK(cur_sorted_var_len_block != NULL);
- if (cur_sorted_var_len_block->BytesRemaining() < total_var_len) {
+ DCHECK(cur_sorted_var_len_page->is_open());
+ if (cur_sorted_var_len_page->BytesRemaining() < total_var_len) {
bool added;
- status = TryAddBlock(UNPIN_PREV, &sorted_var_len_blocks, &added);
- if (!status.ok()) goto cleanup_blocks;
- DCHECK(added) << "TryAddBlock() with UNPIN_PREV should not fail to
add";
- cur_sorted_var_len_block = sorted_var_len_blocks.back();
+ status = TryAddPage(UNPIN_PREV, &sorted_var_len_pages, &added);
+ if (!status.ok()) goto cleanup_pages;
+ DCHECK(added) << "TryAddPage() with UNPIN_PREV should not fail to
add";
+ cur_sorted_var_len_page = &sorted_var_len_pages.back();
}
- uint8_t* var_data_ptr =
- cur_sorted_var_len_block->Allocate<uint8_t>(total_var_len);
- DCHECK_EQ(sorted_var_len_blocks.back(), cur_sorted_var_len_block);
- CopyVarLenDataConvertOffset(string_values,
sorted_var_len_blocks.size() - 1,
- reinterpret_cast<uint8_t*>(cur_sorted_var_len_block->buffer()),
var_data_ptr);
+ uint8_t* var_data_ptr =
cur_sorted_var_len_page->AllocateBytes(total_var_len);
+ DCHECK_EQ(&sorted_var_len_pages.back(), cur_sorted_var_len_page);
+ CopyVarLenDataConvertOffset(string_values, sorted_var_len_pages.size()
- 1,
+ cur_sorted_var_len_page->data(), var_data_ptr);
}
}
- status = cur_fixed_block->Unpin();
- if (!status.ok()) goto cleanup_blocks;
+ cur_fixed_page->Unpin(sorter_->buffer_pool_client_);
}
- if (HasVarLenBlocks()) {
- DCHECK_GT(sorted_var_len_blocks.back()->valid_data_len(), 0);
- status = sorted_var_len_blocks.back()->Unpin();
- if (!status.ok()) goto cleanup_blocks;
+ if (HasVarLenPages()) {
+ DCHECK_GT(sorted_var_len_pages.back().valid_data_len(), 0);
+ sorted_var_len_pages.back().Unpin(sorter_->buffer_pool_client_);
}
- // Clear var_len_blocks_ and replace with it with the contents of
sorted_var_len_blocks
- DeleteAndClearBlocks(&var_len_blocks_);
- sorted_var_len_blocks.swap(var_len_blocks_);
+ // Clear var_len_pages_ and replace with it with the contents of
sorted_var_len_pages
+ DeleteAndClearPages(&var_len_pages_);
+ sorted_var_len_pages.swap(var_len_pages_);
is_pinned_ = false;
sorter_->spilled_runs_counter_->Add(1);
return Status::OK();
-cleanup_blocks:
- DeleteAndClearBlocks(&sorted_var_len_blocks);
+cleanup_pages:
+ DeleteAndClearPages(&sorted_var_len_pages);
return status;
}
-Status Sorter::Run::PrepareRead(bool* pinned_all_blocks) {
+Status Sorter::Run::PrepareRead(bool* pinned) {
DCHECK(is_finalized_);
DCHECK(is_sorted_);
- fixed_len_blocks_index_ = 0;
- fixed_len_block_offset_ = 0;
- var_len_blocks_index_ = 0;
- end_of_fixed_len_block_ = end_of_var_len_block_ = fixed_len_blocks_.empty();
+ fixed_len_pages_index_ = 0;
+ fixed_len_page_offset_ = 0;
+ var_len_pages_index_ = 0;
+ end_of_fixed_len_page_ = end_of_var_len_page_ = fixed_len_pages_.empty();
num_tuples_returned_ = 0;
buffered_batch_.reset(new RowBatch(
sorter_->output_row_desc_, sorter_->state_->batch_size(),
sorter_->mem_tracker_));
- // If the run is pinned, all blocks are already pinned, so we're ready to
read.
+ // If the run is pinned, all pages are already pinned, so we're ready to
read.
if (is_pinned_) {
- *pinned_all_blocks = true;
+ *pinned = true;
return Status::OK();
}
- // Attempt to pin the first fixed and var-length blocks. In either case,
pinning may
- // fail if the number of reserved blocks is oversubscribed, see IMPALA-1590.
- if (fixed_len_blocks_.size() > 0) {
- bool pinned;
- RETURN_IF_ERROR(fixed_len_blocks_[0]->Pin(&pinned));
- if (!pinned) {
- *pinned_all_blocks = false;
- return Status::OK();
- }
+ int num_to_pin = (fixed_len_pages_.size() > 0 ? 1 : 0) + (HasVarLenPages() ?
1 : 0);
+ int64_t required_mem = num_to_pin * sorter_->page_len_;
+ if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(required_mem)) {
+ *pinned = false;
+ return Status::OK();
}
- if (HasVarLenBlocks()) {
- bool pinned;
- RETURN_IF_ERROR(var_len_blocks_[0]->Pin(&pinned));
- if (!pinned) {
- *pinned_all_blocks = false;
- return Status::OK();
- }
+ // Attempt to pin the first fixed and var-length pages.
+ if (fixed_len_pages_.size() > 0) {
+ RETURN_IF_ERROR(fixed_len_pages_[0].Pin(sorter_->buffer_pool_client_));
}
-
- *pinned_all_blocks = true;
+ if (HasVarLenPages()) {
+ RETURN_IF_ERROR(var_len_pages_[0].Pin(sorter_->buffer_pool_client_));
+ }
+ *pinned = true;
return Status::OK();
}
@@ -794,7 +910,7 @@ Status Sorter::Run::GetNextBatch(RowBatch** output_batch) {
buffered_batch_->Reset();
// Fill more rows into buffered_batch_.
bool eos;
- if (HasVarLenBlocks() && !is_pinned_) {
+ if (HasVarLenPages() && !is_pinned_) {
RETURN_IF_ERROR(GetNext<true>(buffered_batch_.get(), &eos));
} else {
RETURN_IF_ERROR(GetNext<false>(buffered_batch_.get(), &eos));
@@ -804,7 +920,7 @@ Status Sorter::Run::GetNextBatch(RowBatch** output_batch) {
// Setting output_batch to NULL signals eos to the caller, so GetNext() is
not
// allowed to attach resources to the batch on eos.
DCHECK_EQ(buffered_batch_->num_rows(), 0);
- DCHECK_EQ(buffered_batch_->num_blocks(), 0);
+ DCHECK_EQ(buffered_batch_->num_buffers(), 0);
*output_batch = NULL;
return Status::OK();
}
@@ -815,122 +931,130 @@ Status Sorter::Run::GetNextBatch(RowBatch**
output_batch) {
template <bool CONVERT_OFFSET_TO_PTR>
Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
// Var-len offsets are converted only when reading var-len data from
unpinned runs.
- // We shouldn't convert var len offsets if there are no blocks, since in
that case
- // they must all be null or zero-length strings, which don't point into a
valid block.
- DCHECK_EQ(CONVERT_OFFSET_TO_PTR, HasVarLenBlocks() && !is_pinned_);
+ // We shouldn't convert var len offsets if there are no pages, since in that
case
+ // they must all be null or zero-length strings, which don't point into a
valid page.
+ DCHECK_EQ(CONVERT_OFFSET_TO_PTR, HasVarLenPages() && !is_pinned_);
- if (end_of_fixed_len_block_ &&
- fixed_len_blocks_index_ >= static_cast<int>(fixed_len_blocks_.size()) -
1) {
+ if (end_of_fixed_len_page_
+ && fixed_len_pages_index_ >= static_cast<int>(fixed_len_pages_.size()) -
1) {
if (is_pinned_) {
- // All blocks were previously attached to output batches. GetNextBatch()
assumes
+ // All pages were previously attached to output batches. GetNextBatch()
assumes
// that we don't attach resources to the batch on eos.
- DCHECK_EQ(NumNonNullBlocks(fixed_len_blocks_), 0);
- DCHECK_EQ(NumNonNullBlocks(var_len_blocks_), 0);
+ DCHECK_EQ(NumOpenPages(fixed_len_pages_), 0);
+ DCHECK_EQ(NumOpenPages(var_len_pages_), 0);
- // Flush resources in case we are in a subplan and need to allocate more
blocks
+ // Flush resources in case we are in a subplan and need to allocate more
pages
// when the node is reopened.
output_batch->MarkFlushResources();
} else {
// We held onto the last fixed or var len blocks without transferring
them to the
// caller. We signalled MarkNeedsDeepCopy() to the caller, so we can
safely delete
// them now to free memory.
- if (!fixed_len_blocks_.empty())
DCHECK_EQ(NumNonNullBlocks(fixed_len_blocks_), 1);
- if (!var_len_blocks_.empty())
DCHECK_EQ(NumNonNullBlocks(var_len_blocks_), 1);
+ if (!fixed_len_pages_.empty()) DCHECK_EQ(NumOpenPages(fixed_len_pages_),
1);
+ if (!var_len_pages_.empty()) DCHECK_EQ(NumOpenPages(var_len_pages_), 1);
}
- DeleteAllBlocks();
+ CloseAllPages();
*eos = true;
DCHECK_EQ(num_tuples_returned_, num_tuples_);
return Status::OK();
}
- // Advance the fixed or var len block if we reached the end in the previous
call to
+ // Advance the fixed or var len page if we reached the end in the previous
call to
// GetNext().
- if (end_of_fixed_len_block_) {
- RETURN_IF_ERROR(PinNextReadBlock(&fixed_len_blocks_,
fixed_len_blocks_index_));
- ++fixed_len_blocks_index_;
- fixed_len_block_offset_ = 0;
- end_of_fixed_len_block_ = false;
- }
- if (end_of_var_len_block_) {
- RETURN_IF_ERROR(PinNextReadBlock(&var_len_blocks_, var_len_blocks_index_));
- ++var_len_blocks_index_;
- end_of_var_len_block_ = false;
- }
-
- // Fills rows into the output batch until a block boundary is reached.
- BufferedBlockMgr::Block* fixed_len_block =
fixed_len_blocks_[fixed_len_blocks_index_];
- DCHECK(fixed_len_block != NULL);
- while (!output_batch->AtCapacity() &&
- fixed_len_block_offset_ < fixed_len_block->valid_data_len()) {
- DCHECK(fixed_len_block != NULL);
- Tuple* input_tuple = reinterpret_cast<Tuple*>(
- fixed_len_block->buffer() + fixed_len_block_offset_);
+ if (end_of_fixed_len_page_) {
+ RETURN_IF_ERROR(PinNextReadPage(&fixed_len_pages_,
fixed_len_pages_index_));
+ ++fixed_len_pages_index_;
+ fixed_len_page_offset_ = 0;
+ end_of_fixed_len_page_ = false;
+ }
+ if (end_of_var_len_page_) {
+ RETURN_IF_ERROR(PinNextReadPage(&var_len_pages_, var_len_pages_index_));
+ ++var_len_pages_index_;
+ end_of_var_len_page_ = false;
+ }
+
+ // Fills rows into the output batch until a page boundary is reached.
+ Page* fixed_len_page = &fixed_len_pages_[fixed_len_pages_index_];
+ DCHECK(fixed_len_page != NULL);
+
+ // Ensure we have a reference to the fixed-length page's buffer.
+ RETURN_IF_ERROR(fixed_len_page->WaitForBuffer());
+
+ // If we're converting offsets into unpinned var-len pages, make sure the
+ // current var-len page is in memory.
+ if (CONVERT_OFFSET_TO_PTR && HasVarLenPages()) {
+ RETURN_IF_ERROR(var_len_pages_[var_len_pages_index_].WaitForBuffer());
+ }
+
+ while (!output_batch->AtCapacity()
+ && fixed_len_page_offset_ < fixed_len_page->valid_data_len()) {
+ DCHECK(fixed_len_page != NULL);
+ Tuple* input_tuple =
+ reinterpret_cast<Tuple*>(fixed_len_page->data() +
fixed_len_page_offset_);
if (CONVERT_OFFSET_TO_PTR && !ConvertOffsetsToPtrs(input_tuple)) {
DCHECK(!is_pinned_);
- // The var-len data is in the next block. We are done with the current
block, so
- // return rows we've accumulated so far and advance to the next block in
the next
- // GetNext() call. This is needed for the unpinned case where we need to
exchange
- // this block for the next in the next GetNext() call. So therefore we
must hold
- // onto the current var-len block and signal to the caller that the
block is going
+ // The var-len data is in the next page. We are done with the current
page, so
+ // return rows we've accumulated so far and advance to the next page in
the next
+ // GetNext() call. This is needed for the unpinned case where we will
exchange
+ // this page for the next in the next GetNext() call. So therefore we
must hold
+ // onto the current var-len page and signal to the caller that the page
is going
// to be deleted.
output_batch->MarkNeedsDeepCopy();
- end_of_var_len_block_ = true;
+ end_of_var_len_page_ = true;
break;
}
output_batch->GetRow(output_batch->AddRow())->SetTuple(0, input_tuple);
output_batch->CommitLastRow();
- fixed_len_block_offset_ += sort_tuple_size_;
+ fixed_len_page_offset_ += sort_tuple_size_;
++num_tuples_returned_;
}
- if (fixed_len_block_offset_ >= fixed_len_block->valid_data_len()) {
- // Reached the block boundary, need to move to the next block.
+ if (fixed_len_page_offset_ >= fixed_len_page->valid_data_len()) {
+ // Reached the page boundary, need to move to the next page.
if (is_pinned_) {
- // Attach block to batch. Caller can delete the block when it wants to.
- output_batch->AddBlock(fixed_len_blocks_[fixed_len_blocks_index_],
+ BufferPool::ClientHandle* client = sorter_->buffer_pool_client_;
+ // Attach page to batch. Caller can delete the page when it wants to.
+ output_batch->AddBuffer(client,
+ fixed_len_pages_[fixed_len_pages_index_].ExtractBuffer(client),
RowBatch::FlushMode::NO_FLUSH_RESOURCES);
- fixed_len_blocks_[fixed_len_blocks_index_] = NULL;
- // Attach the var-len blocks at eos once no more rows will reference the
blocks.
- if (fixed_len_blocks_index_ == fixed_len_blocks_.size() - 1) {
- for (BufferedBlockMgr::Block* var_len_block: var_len_blocks_) {
- DCHECK(var_len_block != NULL);
- output_batch->AddBlock(var_len_block,
RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+ // Attach the var-len pages at eos once no more rows will reference the
pages.
+ if (fixed_len_pages_index_ == fixed_len_pages_.size() - 1) {
+ for (Page& var_len_page : var_len_pages_) {
+ DCHECK(var_len_page.is_open());
+ output_batch->AddBuffer(client, var_len_page.ExtractBuffer(client),
+ RowBatch::FlushMode::NO_FLUSH_RESOURCES);
}
- var_len_blocks_.clear();
+ var_len_pages_.clear();
}
} else {
- // To iterate over unpinned runs, we need to exchange this block for the
next
- // in the next GetNext() call, so we need to hold onto the block and
signal to
- // the caller that the block is going to be deleted.
+ // To iterate over unpinned runs, we need to exchange this page for the
next
+ // in the next GetNext() call, so we need to hold onto the page and
signal to
+ // the caller that the page is going to be deleted.
output_batch->MarkNeedsDeepCopy();
}
- end_of_fixed_len_block_ = true;
+ end_of_fixed_len_page_ = true;
}
*eos = false;
return Status::OK();
}
-Status Sorter::Run::PinNextReadBlock(vector<BufferedBlockMgr::Block*>* blocks,
- int block_index) {
- DCHECK_GE(block_index, 0);
- DCHECK_LT(block_index, blocks->size() - 1);
- BufferedBlockMgr::Block* curr_block = (*blocks)[block_index];
- BufferedBlockMgr::Block* next_block = (*blocks)[block_index + 1];
- DCHECK_EQ(is_pinned_, next_block->is_pinned());
+Status Sorter::Run::PinNextReadPage(vector<Page>* pages, int page_index) {
+ DCHECK_GE(page_index, 0);
+ DCHECK_LT(page_index, pages->size() - 1);
+ Page* curr_page = &(*pages)[page_index];
+ Page* next_page = &(*pages)[page_index + 1];
+ DCHECK_EQ(is_pinned_, next_page->is_pinned());
if (is_pinned_) {
- // The current block was attached to a batch and 'next_block' is already
pinned.
- DCHECK(curr_block == NULL);
+ // The current page was attached to a batch and 'next_page' is already
pinned.
+ DCHECK(!curr_page->is_open());
return Status::OK();
}
- bool pinned;
- // Atomically delete the previous block and pin this one. Should not fail
due to lack
- // of memory. Pin() deletes the block even in error cases, so we need to
remove it from
- // the vector first to avoid an inconsistent state.
- (*blocks)[block_index] = NULL;
- RETURN_IF_ERROR(next_block->Pin(&pinned, curr_block, false));
- DCHECK(pinned) << "Atomic delete and pin should not fail without error.";
+ // Close the previous page to free memory and pin the next page. Should
always succeed
+ // since the pages are the same size.
+ curr_page->Close(sorter_->buffer_pool_client_);
+ RETURN_IF_ERROR(next_page->Pin(sorter_->buffer_pool_client_));
return Status::OK();
}
@@ -948,28 +1072,29 @@ void Sorter::Run::CollectNonNullVarSlots(Tuple* src,
}
}
-Status Sorter::Run::TryAddBlock(AddBlockMode mode,
- vector<BufferedBlockMgr::Block*>* block_sequence, bool* added) {
- DCHECK(!block_sequence->empty());
- BufferedBlockMgr::Block* prev_block;
+Status Sorter::Run::TryAddPage(
+ AddPageMode mode, vector<Page>* page_sequence, bool* added) {
+ DCHECK(!page_sequence->empty());
if (mode == KEEP_PREV_PINNED) {
- prev_block = NULL;
+ if
(!sorter_->buffer_pool_client_->IncreaseReservationToFit(sorter_->page_len_)) {
+ *added = false;
+ return Status::OK();
+ }
} else {
DCHECK(mode == UNPIN_PREV);
- // Swap the prev block with the next, to guarantee success.
- prev_block = block_sequence->back();
+ // Unpin the prev page to free up the memory required to pin the next page.
+ page_sequence->back().Unpin(sorter_->buffer_pool_client_);
}
- BufferedBlockMgr::Block* new_block;
- RETURN_IF_ERROR(sorter_->block_mgr_->GetNewBlock(
- sorter_->block_mgr_client_, prev_block, &new_block));
- if (new_block != NULL) {
- *added = true;
- block_sequence->push_back(new_block);
- } else {
- DCHECK_EQ(mode, KEEP_PREV_PINNED);
- *added = false;
- }
+ RETURN_IF_ERROR(AddPage(page_sequence));
+ *added = true;
+ return Status::OK();
+}
+
+Status Sorter::Run::AddPage(vector<Page>* page_sequence) {
+ Page new_page;
+ RETURN_IF_ERROR(new_page.Init(sorter_));
+ page_sequence->push_back(move(new_page));
return Status::OK();
}
@@ -983,27 +1108,26 @@ void Sorter::Run::CopyVarLenData(const
vector<StringValue*>& string_values,
}
void Sorter::Run::CopyVarLenDataConvertOffset(const vector<StringValue*>&
string_values,
- int block_index, const uint8_t* block_start, uint8_t* dest) {
- DCHECK_GE(block_index, 0);
- DCHECK_GE(dest - block_start, 0);
+ int page_index, const uint8_t* page_start, uint8_t* dest) {
+ DCHECK_GE(page_index, 0);
+ DCHECK_GE(dest - page_start, 0);
- for (StringValue* string_val: string_values) {
+ for (StringValue* string_val : string_values) {
memcpy(dest, string_val->ptr, string_val->len);
- DCHECK_LE(dest - block_start, sorter_->block_mgr_->max_block_size());
- DCHECK_LE(dest - block_start, INT_MAX);
- int block_offset = dest - block_start;
- uint64_t packed_offset =
- (static_cast<uint64_t>(block_index) << 32) | block_offset;
+ DCHECK_LE(dest - page_start, sorter_->page_len_);
+ DCHECK_LE(dest - page_start, numeric_limits<uint32_t>::max());
+ uint32_t page_offset = dest - page_start;
+ uint64_t packed_offset = (static_cast<uint64_t>(page_index) << 32) |
page_offset;
string_val->ptr = reinterpret_cast<char*>(packed_offset);
dest += string_val->len;
}
}
bool Sorter::Run::ConvertOffsetsToPtrs(Tuple* tuple) {
- // We need to be careful to handle the case where var_len_blocks_ is empty,
+ // We need to be careful to handle the case where var_len_pages_ is empty,
// e.g. if all strings are NULL.
- uint8_t* block_start = var_len_blocks_.empty() ? NULL :
- var_len_blocks_[var_len_blocks_index_]->buffer();
+ uint8_t* page_start =
+ var_len_pages_.empty() ? NULL :
var_len_pages_[var_len_pages_index_].data();
const vector<SlotDescriptor*>& string_slots =
sort_tuple_desc_->string_slots();
int num_non_null_string_slots = 0;
@@ -1015,47 +1139,47 @@ bool Sorter::Run::ConvertOffsetsToPtrs(Tuple* tuple) {
DCHECK(slot_desc->type().IsVarLenStringType());
StringValue* value = reinterpret_cast<StringValue*>(
tuple->GetSlot(slot_desc->tuple_offset()));
- // packed_offset includes the block index in the upper 32 bits and the
block
+ // packed_offset includes the page index in the upper 32 bits and the page
// offset in the lower 32 bits. See CopyVarLenDataConvertOffset().
uint64_t packed_offset = reinterpret_cast<uint64_t>(value->ptr);
- int block_index = packed_offset >> 32;
- int block_offset = packed_offset & 0xFFFFFFFF;
+ uint32_t page_index = packed_offset >> 32;
+ uint32_t page_offset = packed_offset & 0xFFFFFFFF;
- if (block_index > var_len_blocks_index_) {
- // We've reached the block boundary for the current var-len block.
+ if (page_index > var_len_pages_index_) {
+ // We've reached the page boundary for the current var-len page.
// This tuple will be returned in the next call to GetNext().
- DCHECK_GE(block_index, 0);
- DCHECK_LE(block_index, var_len_blocks_.size());
- DCHECK_EQ(block_index, var_len_blocks_index_ + 1);
- DCHECK_EQ(block_offset, 0); // The data is the first thing in the next
block.
+ DCHECK_GE(page_index, 0);
+ DCHECK_LE(page_index, var_len_pages_.size());
+ DCHECK_EQ(page_index, var_len_pages_index_ + 1);
+ DCHECK_EQ(page_offset, 0); // The data is the first thing in the next
page.
// This must be the first slot with var len data for the tuple. Var len
data
// for tuple shouldn't be split across blocks.
DCHECK_EQ(num_non_null_string_slots, 1);
return false;
}
- DCHECK_EQ(block_index, var_len_blocks_index_);
- if (var_len_blocks_.empty()) {
+ DCHECK_EQ(page_index, var_len_pages_index_);
+ if (var_len_pages_.empty()) {
DCHECK_EQ(value->len, 0);
} else {
- DCHECK_LE(block_offset + value->len,
var_len_blocks_[block_index]->valid_data_len());
+ DCHECK_LE(page_offset + value->len,
var_len_pages_[page_index].valid_data_len());
}
// Calculate the address implied by the offset and assign it. May be NULL
for
- // zero-length strings if there are no blocks in the run since block_start
is NULL.
- DCHECK(block_start != NULL || block_offset == 0);
- value->ptr = reinterpret_cast<char*>(block_start + block_offset);
+ // zero-length strings if there are no pages in the run since page_start
is NULL.
+ DCHECK(page_start != NULL || page_offset == 0);
+ value->ptr = reinterpret_cast<char*>(page_start + page_offset);
}
return true;
}
int64_t Sorter::Run::TotalBytes() const {
int64_t total_bytes = 0;
- for (BufferedBlockMgr::Block* block: fixed_len_blocks_) {
- if (block != NULL) total_bytes += block->valid_data_len();
+ for (const Page& page : fixed_len_pages_) {
+ if (page.is_open()) total_bytes += page.valid_data_len();
}
- for (BufferedBlockMgr::Block* block: var_len_blocks_) {
- if (block != NULL) total_bytes += block->valid_data_len();
+ for (const Page& page : var_len_pages_) {
+ if (page.is_open()) total_bytes += page.valid_data_len();
}
return total_bytes;
}
@@ -1072,61 +1196,61 @@ Sorter::TupleIterator::TupleIterator(Sorter::Run* run,
int64_t index)
}
const int tuple_size = run->sort_tuple_size_;
- int block_offset;
+ uint32_t page_offset;
if (UNLIKELY(index == run->num_tuples())) {
// If the iterator is initialized past the end, set up buffer_start_index_,
- // 'buffer_end_index_' and 'block_index_' for the last block, then set
'tuple' to
+ // 'buffer_end_index_' and 'page_index_' for the last page, then set
'tuple' to
// 'tuple_size' bytes past the last tuple, so everything is correct when
Prev() is
// invoked.
- block_index_ = run->fixed_len_blocks_.size() - 1;
- block_offset = ((index - 1) % run->block_capacity_) * tuple_size +
tuple_size;
+ page_index_ = run->fixed_len_pages_.size() - 1;
+ page_offset = ((index - 1) % run->page_capacity_) * tuple_size +
tuple_size;
} else {
- block_index_ = index / run->block_capacity_;
- block_offset = (index % run->block_capacity_) * tuple_size;
+ page_index_ = index / run->page_capacity_;
+ page_offset = (index % run->page_capacity_) * tuple_size;
}
- buffer_start_index_ = block_index_ * run->block_capacity_;
- buffer_end_index_ = buffer_start_index_ + run->block_capacity_;
- tuple_ = run->fixed_len_blocks_[block_index_]->buffer() + block_offset;
+ buffer_start_index_ = page_index_ * run->page_capacity_;
+ buffer_end_index_ = buffer_start_index_ + run->page_capacity_;
+ tuple_ = run->fixed_len_pages_[page_index_].data() + page_offset;
}
void Sorter::TupleIterator::Next(Sorter::Run* run, int tuple_size) {
DCHECK_LT(index_, run->num_tuples()) << "Can only advance one past end of
run";
tuple_ += tuple_size;
++index_;
- if (UNLIKELY(index_ >= buffer_end_index_)) NextBlock(run, tuple_size);
+ if (UNLIKELY(index_ >= buffer_end_index_)) NextPage(run, tuple_size);
}
-void Sorter::TupleIterator::NextBlock(Sorter::Run* run, int tuple_size) {
- // When moving after the last tuple, stay at the last block.
+void Sorter::TupleIterator::NextPage(Sorter::Run* run, int tuple_size) {
+ // When moving after the last tuple, stay at the last page.
if (index_ >= run->num_tuples()) return;
- ++block_index_;
- DCHECK_LT(block_index_, run->fixed_len_blocks_.size());
- buffer_start_index_ = block_index_ * run->block_capacity_;
+ ++page_index_;
+ DCHECK_LT(page_index_, run->fixed_len_pages_.size());
+ buffer_start_index_ = page_index_ * run->page_capacity_;
DCHECK_EQ(index_, buffer_start_index_);
- buffer_end_index_ = buffer_start_index_ + run->block_capacity_;
- tuple_ = run->fixed_len_blocks_[block_index_]->buffer();
+ buffer_end_index_ = buffer_start_index_ + run->page_capacity_;
+ tuple_ = run->fixed_len_pages_[page_index_].data();
}
void Sorter::TupleIterator::Prev(Sorter::Run* run, int tuple_size) {
DCHECK_GE(index_, 0) << "Can only advance one before start of run";
tuple_ -= tuple_size;
--index_;
- if (UNLIKELY(index_ < buffer_start_index_)) PrevBlock(run, tuple_size);
+ if (UNLIKELY(index_ < buffer_start_index_)) PrevPage(run, tuple_size);
}
-void Sorter::TupleIterator::PrevBlock(Sorter::Run* run, int tuple_size) {
- // When moving before the first tuple, stay at the first block.
+void Sorter::TupleIterator::PrevPage(Sorter::Run* run, int tuple_size) {
+ // When moving before the first tuple, stay at the first page.
if (index_ < 0) return;
- --block_index_;
- DCHECK_GE(block_index_, 0);
- buffer_start_index_ = block_index_ * run->block_capacity_;
- buffer_end_index_ = buffer_start_index_ + run->block_capacity_;
+ --page_index_;
+ DCHECK_GE(page_index_, 0);
+ buffer_start_index_ = page_index_ * run->page_capacity_;
+ buffer_end_index_ = buffer_start_index_ + run->page_capacity_;
DCHECK_EQ(index_, buffer_end_index_ - 1);
- int last_tuple_block_offset = run->sort_tuple_size_ * (run->block_capacity_
- 1);
- tuple_ = run->fixed_len_blocks_[block_index_]->buffer() +
last_tuple_block_offset;
+ int last_tuple_page_offset = run->sort_tuple_size_ * (run->page_capacity_ -
1);
+ tuple_ = run->fixed_len_pages_[page_index_].data() + last_tuple_page_offset;
}
-Sorter::TupleSorter::TupleSorter(const TupleRowComparator& comp, int64_t
block_size,
+Sorter::TupleSorter::TupleSorter(const TupleRowComparator& comp, int64_t
page_size,
int tuple_size, RuntimeState* state)
: tuple_size_(tuple_size),
comparator_(comp),
@@ -1340,13 +1464,15 @@ inline void Sorter::TupleSorter::Swap(Tuple* left,
Tuple* right, Tuple* swap_tup
Sorter::Sorter(const TupleRowComparator& compare_less_than,
const vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor*
output_row_desc,
- MemTracker* mem_tracker, RuntimeProfile* profile, RuntimeState* state,
+ MemTracker* mem_tracker, BufferPool::ClientHandle* buffer_pool_client,
+ int64_t page_len, RuntimeProfile* profile, RuntimeState* state, int
node_id,
bool enable_spilling)
- : state_(state),
+ : node_id_(node_id),
+ state_(state),
compare_less_than_(compare_less_than),
in_mem_tuple_sorter_(NULL),
- block_mgr_(state->block_mgr()),
- block_mgr_client_(NULL),
+ buffer_pool_client_(buffer_pool_client),
+ page_len_(page_len),
has_var_len_slots_(false),
sort_tuple_exprs_(sort_tuple_exprs),
mem_tracker_(mem_tracker),
@@ -1370,10 +1496,24 @@ Sorter::~Sorter() {
Status Sorter::Prepare(ObjectPool* obj_pool, MemPool* expr_mem_pool) {
DCHECK(in_mem_tuple_sorter_ == NULL) << "Already prepared";
+ // Page byte offsets are packed into uint32_t values, which limits the
supported
+ // page size.
+ if (page_len_ > numeric_limits<uint32_t>::max()) {
+ return Status(Substitute(
+ "Page size $0 exceeded maximum supported in sorter ($1)",
+ PrettyPrinter::PrintBytes(page_len_),
+ PrettyPrinter::PrintBytes(numeric_limits<uint32_t>::max())));
+ }
+
TupleDescriptor* sort_tuple_desc = output_row_desc_->tuple_descriptors()[0];
+ if (sort_tuple_desc->byte_size() > page_len_) {
+ return Status(TErrorCode::MAX_ROW_SIZE,
+ PrettyPrinter::Print(sort_tuple_desc->byte_size(), TUnit::BYTES),
node_id_,
+ PrettyPrinter::Print(0, TUnit::BYTES));
+ }
has_var_len_slots_ = sort_tuple_desc->HasVarlenSlots();
- in_mem_tuple_sorter_.reset(new TupleSorter(compare_less_than_,
- block_mgr_->max_block_size(), sort_tuple_desc->byte_size(), state_));
+ in_mem_tuple_sorter_.reset(new TupleSorter(compare_less_than_, page_len_,
+ sort_tuple_desc->byte_size(), state_));
initial_runs_counter_ = ADD_COUNTER(profile_, "InitialRunsCreated",
TUnit::UNIT);
spilled_runs_counter_ = ADD_COUNTER(profile_, "SpilledRuns", TUnit::UNIT);
@@ -1382,17 +1522,6 @@ Status Sorter::Prepare(ObjectPool* obj_pool, MemPool*
expr_mem_pool) {
sorted_data_size_ = ADD_COUNTER(profile_, "SortDataSize", TUnit::BYTES);
run_sizes_ = ADD_SUMMARY_STATS_COUNTER(profile_, "NumRowsPerRun",
TUnit::UNIT);
- // If spilling is enabled, we need enough buffers to perform merges.
Otherwise, there
- // won't be any merges and we only need 1 buffer.
- // Must be kept in sync with SortNode.computeResourceProfile() in fe.
- int min_buffers_required = enable_spilling_ ? MIN_BUFFERS_PER_MERGE : 1;
- // Fixed and var-length blocks are separate, so we need twice as many blocks
for both if
- // there is var-length data.
- if (sort_tuple_desc->HasVarlenSlots()) min_buffers_required *= 2;
-
- RETURN_IF_ERROR(block_mgr_->RegisterClient(Substitute("Sorter ptr=$0", this),
- min_buffers_required, false, mem_tracker_, state_, &block_mgr_client_));
-
RETURN_IF_ERROR(ScalarExprEvaluator::Create(sort_tuple_exprs_, state_,
obj_pool,
expr_mem_pool, &sort_tuple_expr_evals_));
return Status::OK();
@@ -1413,6 +1542,15 @@ void Sorter::FreeLocalAllocations() {
ScalarExprEvaluator::FreeLocalAllocations(sort_tuple_expr_evals_);
}
+int64_t Sorter::ComputeMinReservation() {
+ // Must be kept in sync with SortNode.computeNodeResourceProfile() in fe.
+ int min_buffers_required = enable_spilling_ ? MIN_BUFFERS_PER_MERGE : 1;
+ // Fixed and var-length pages are separate, so we need double the pages
+ // if there is var-length data.
+ if (output_row_desc_->HasVarlenSlots()) min_buffers_required *= 2;
+ return min_buffers_required * page_len_;
+}
+
Status Sorter::AddBatch(RowBatch* batch) {
DCHECK(unsorted_run_ != NULL);
DCHECK(batch != NULL);
@@ -1424,11 +1562,12 @@ Status Sorter::AddBatch(RowBatch* batch) {
cur_batch_index += num_processed;
if (cur_batch_index < batch->num_rows()) {
- // The current run is full. Sort it and begin the next one.
+ // The current run is full. Sort it, spill it and begin the next one.
+ RETURN_IF_ERROR(state_->StartSpilling(mem_tracker_));
RETURN_IF_ERROR(SortCurrentInputRun());
- RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllBlocks());
- unsorted_run_ = obj_pool_.Add(
- new Run(this, output_row_desc_->tuple_descriptors()[0], true));
+ RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
+ unsorted_run_ =
+ obj_pool_.Add(new Run(this,
output_row_desc_->tuple_descriptors()[0], true));
RETURN_IF_ERROR(unsorted_run_->Init());
}
}
@@ -1459,7 +1598,7 @@ Status Sorter::InputDone() {
// Unpin the final run to free up memory for the merge.
// TODO: we could keep it in memory in some circumstances as an
optimisation, once
// we have a buffer pool with more reliable reservations (IMPALA-3200).
- RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllBlocks());
+ RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
// Merge intermediate runs until we have a final merge set-up.
// TODO: Attempt to allocate more memory before doing intermediate merges.
This may
@@ -1487,7 +1626,6 @@ void Sorter::Reset() {
void Sorter::Close(RuntimeState* state) {
CleanupAllRuns();
- block_mgr_->ClearReservations(block_mgr_client_);
obj_pool_.Clear();
ScalarExprEvaluator::Close(sort_tuple_expr_evals_, state);
}
@@ -1495,9 +1633,9 @@ void Sorter::Close(RuntimeState* state) {
void Sorter::CleanupAllRuns() {
Run::CleanupRuns(&sorted_runs_);
Run::CleanupRuns(&merging_runs_);
- if (unsorted_run_ != NULL) unsorted_run_->DeleteAllBlocks();
+ if (unsorted_run_ != NULL) unsorted_run_->CloseAllPages();
unsorted_run_ = NULL;
- if (merge_output_run_ != NULL) merge_output_run_->DeleteAllBlocks();
+ if (merge_output_run_ != NULL) merge_output_run_->CloseAllPages();
merge_output_run_ = NULL;
}
@@ -1519,10 +1657,10 @@ Status Sorter::SortCurrentInputRun() {
Status Sorter::MergeIntermediateRuns() {
DCHECK_GE(sorted_runs_.size(), 2);
- int pinned_blocks_per_run = has_var_len_slots_ ? 2 : 1;
- int max_runs_per_final_merge = MAX_BUFFERS_PER_MERGE / pinned_blocks_per_run;
+ int pinned_pages_per_run = has_var_len_slots_ ? 2 : 1;
+ int max_runs_per_final_merge = MAX_BUFFERS_PER_MERGE / pinned_pages_per_run;
- // During an intermediate merge, the one or two blocks from the output
sorted run
+ // During an intermediate merge, the one or two pages from the output sorted
run
// that are being written must be pinned.
int max_runs_per_intermediate_merge = max_runs_per_final_merge - 1;
DCHECK_GT(max_runs_per_intermediate_merge, 1);
@@ -1549,7 +1687,7 @@ Status Sorter::MergeIntermediateRuns() {
if (sorted_runs_.empty()) {
// Don't need intermediate run for final merge.
if (merge_output_run_ != NULL) {
- merge_output_run_->DeleteAllBlocks();
+ merge_output_run_->CloseAllPages();
merge_output_run_ = NULL;
}
return Status::OK();
@@ -1604,7 +1742,8 @@ Status Sorter::CreateMerger(int max_num_runs) {
}
Status Sorter::ExecuteIntermediateMerge(Sorter::Run* merged_run) {
- RowBatch intermediate_merge_batch(output_row_desc_, state_->batch_size(),
mem_tracker_);
+ RowBatch intermediate_merge_batch(
+ output_row_desc_, state_->batch_size(), mem_tracker_);
bool eos = false;
while (!eos) {
// Copy rows into the new run until done.
@@ -1621,5 +1760,4 @@ Status Sorter::ExecuteIntermediateMerge(Sorter::Run*
merged_run) {
RETURN_IF_ERROR(merged_run->FinalizeInput());
return Status::OK();
}
-
} // namespace impala