http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index b4ef279..ee0e4be 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -17,15 +17,20 @@
 
 #include "runtime/sorter.h"
 
+#include <limits>
+
 #include <boost/random/mersenne_twister.hpp>
 #include <boost/random/uniform_int.hpp>
 #include <gutil/strings/substitute.h>
 
-#include "runtime/buffered-block-mgr.h"
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/sorted-run-merger.h"
+#include "util/pretty-printer.h"
 #include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
@@ -36,7 +41,7 @@ using namespace strings;
 
 namespace impala {
 
-// Number of pinned blocks required for a merge with fixed-length data only.
+// Number of pinned pages required for a merge with fixed-length data only.
 const int MIN_BUFFERS_PER_MERGE = 3;
 
 // Maximum number of buffers to use in each merge to prevent sorter trying to 
grab
@@ -46,35 +51,140 @@ const int MIN_BUFFERS_PER_MERGE = 3;
 // we should base this on the number of reservations.
 const int MAX_BUFFERS_PER_MERGE = 128;
 
-const string MEM_ALLOC_FAILED_ERROR_MSG = "Failed to allocate block for 
$0-length "
-    "data needed for sorting. Reducing query concurrency or increasing the "
-    "memory limit may help this query to complete successfully.";
-
-const string MERGE_FAILED_ERROR_MSG = "Failed to allocate block to merge 
spilled runs "
+const string MERGE_FAILED_ERROR_MSG = "Failed to allocate page to merge 
spilled runs "
     "during sorting. Only $0 runs could be merged, but must be able to merge 
at least 2 "
     "to make progress. Reducing query concurrency or increasing the memory 
limit may "
     "help this query to complete successfully.";
 
-/// Delete all non-null blocks in blocks and clear vector.
-static void DeleteAndClearBlocks(vector<BufferedBlockMgr::Block*>* blocks) {
-  for (BufferedBlockMgr::Block* block: *blocks) {
-    if (block != NULL) block->Delete();
+/// Wrapper around BufferPool::PageHandle that tracks additional info about 
the page.
+/// The Page can be in four states:
+/// * Closed: The page starts in this state before Init() is called. Calling
+///   ExtractBuffer() or Close() puts the page back in this state. No other 
operations
+///   are valid on a closed page.
+/// * In memory: the page is pinned and the buffer is in memory. data() is 
valid. The
+///   page is in this state after Init(). If the page is pinned but not in 
memory, it
+///   can be brought into this state by calling WaitForBuffer().
+/// * Unpinned: the page was unpinned by calling Unpin(). It is invalid to 
access the
+///   page's buffer.
+/// * Pinned but not in memory: Pin() was called on the unpinned page, but
+///   WaitForBuffer() has not been called. It is invalid to access the page's 
buffer.
+class Sorter::Page {
+ public:
+  Page() { Reset(); }
+
+  /// Create a new page of length 'sorter->page_len_' bytes using
+  /// 'sorter->buffer_pool_client_'. Caller must ensure the client has enough
+  /// reservation for the page.
+  Status Init(Sorter* sorter) WARN_UNUSED_RESULT {
+    const BufferPool::BufferHandle* page_buffer;
+    RETURN_IF_ERROR(pool()->CreatePage(sorter->buffer_pool_client_, 
sorter->page_len_,
+        &handle_, &page_buffer));
+    data_ = page_buffer->data();
+    return Status::OK();
   }
-  blocks->clear();
-}
 
-static int NumNonNullBlocks(const vector<BufferedBlockMgr::Block*>& blocks) {
-  int count = 0;
-  for (BufferedBlockMgr::Block* block: blocks) {
-    if (block != NULL) ++count;
+  /// Extract the buffer from the page. The page must be in memory. When this 
function
+  /// returns the page is closed.
+  BufferPool::BufferHandle ExtractBuffer(BufferPool::ClientHandle* client) {
+    DCHECK(data_ != nullptr) << "Page must be in memory";
+    BufferPool::BufferHandle buffer;
+    Status status = pool()->ExtractBuffer(client, &handle_, &buffer);
+    DCHECK(status.ok()) << "Page was in memory, ExtractBuffer() shouldn't 
fail";
+    Reset();
+    return buffer;
+  }
+
+  /// Allocate 'len' bytes in the current page. The page must be in memory, 
and the
+  /// amount to allocate cannot exceed BytesRemaining().
+  uint8_t* AllocateBytes(int64_t len) {
+    DCHECK_GE(len, 0);
+    DCHECK_LE(len, BytesRemaining());
+    DCHECK(data_ != nullptr);
+    uint8_t* result = data_ + valid_data_len_;
+    valid_data_len_ += len;
+    return result;
+  }
+
+  /// Free the last 'len' bytes allocated from AllocateBytes(). The page must 
be in
+  /// memory.
+  void FreeBytes(int64_t len) {
+    DCHECK_GE(len, 0);
+    DCHECK_LE(len, valid_data_len_);
+    DCHECK(data_ != nullptr);
+    valid_data_len_ -= len;
+  }
+
+  /// Return number of bytes remaining in page.
+  int64_t BytesRemaining() { return len() - valid_data_len_; }
+
+  /// Brings a pinned page into memory, if not already in memory, and sets 
'data_' to
+  /// point to the page's buffer.
+  Status WaitForBuffer() WARN_UNUSED_RESULT {
+    DCHECK(handle_.is_pinned());
+    if (data_ != nullptr) return Status::OK();
+    const BufferPool::BufferHandle* page_buffer;
+    RETURN_IF_ERROR(handle_.GetBuffer(&page_buffer));
+    data_ = page_buffer->data();
+    return Status::OK();
+  }
+
+  /// Helper to pin the page. Caller must ensure the client has enough 
reservation
+  /// remaining to pin the page. Only valid to call on an unpinned page.
+  Status Pin(BufferPool::ClientHandle* client) WARN_UNUSED_RESULT {
+    DCHECK(!handle_.is_pinned());
+    return pool()->Pin(client, &handle_);
+  }
+
+  /// Helper to unpin the page.
+  void Unpin(BufferPool::ClientHandle* client) {
+    pool()->Unpin(client, &handle_);
+    data_ = nullptr;
+  }
+
+  /// Destroy the page with 'client'.
+  void Close(BufferPool::ClientHandle* client) {
+    pool()->DestroyPage(client, &handle_);
+    Reset();
   }
-  return count;
-}
+
+  int64_t valid_data_len() const { return valid_data_len_; }
+  /// Returns a pointer to the start of the page's buffer. Only valid to call 
if the
+  /// page is in memory.
+  uint8_t* data() const {
+    DCHECK(data_ != nullptr);
+    return data_;
+  }
+  int64_t len() const { return handle_.len(); }
+  bool is_open() const { return handle_.is_open(); }
+  bool is_pinned() const { return handle_.is_pinned(); }
+  std::string DebugString() const { return handle_.DebugString(); }
+
+ private:
+  /// Reset the page to an unitialized state. 'handle_' must already be closed.
+  void Reset() {
+    DCHECK(!handle_.is_open());
+    valid_data_len_ = 0;
+    data_ = nullptr;
+  }
+
+  /// Helper to get the singleton buffer pool.
+  static BufferPool* pool() { return ExecEnv::GetInstance()->buffer_pool(); }
+
+  BufferPool::PageHandle handle_;
+
+  /// Length of valid data written to the page.
+  int64_t valid_data_len_;
+
+  /// Cached pointer to the buffer in 'handle_'. NULL if the page is unpinned. 
May be NULL
+  /// or not NULL if the page is pinned. Can be populated by calling 
WaitForBuffer() on a
+  /// pinned page.
+  uint8_t* data_;
+};
 
 /// A run is a sequence of tuples. The run can be sorted or unsorted (in which 
case the
-/// Sorter will sort it). A run comprises a sequence of fixed-length blocks 
containing the
+/// Sorter will sort it). A run comprises a sequence of fixed-length pages 
containing the
 /// tuples themselves (i.e. fixed-len slots that may contain ptrs to 
var-length data), and
-/// an optional sequence of var-length blocks containing the var-length data.
+/// an optional sequence of var-length pages containing the var-length data.
 ///
 /// Runs are either "initial runs" constructed from the sorter's input by 
evaluating
 /// the expressions in 'sort_tuple_exprs_' or "intermediate runs" constructed
@@ -84,7 +194,7 @@ static int NumNonNullBlocks(const 
vector<BufferedBlockMgr::Block*>& blocks) {
 /// sorted run.
 ///
 /// The expected calling sequence of functions is as follows:
-/// * Init() to initialize the run and allocate initial blocks.
+/// * Init() to initialize the run and allocate initial pages.
 /// * Add*Batch() to add batches of tuples to the run.
 /// * FinalizeInput() to signal that no more batches will be added.
 /// * If the run is unsorted, it must be sorted. After that set_sorted() must 
be called.
@@ -92,29 +202,30 @@ static int NumNonNullBlocks(const 
vector<BufferedBlockMgr::Block*>& blocks) {
 /// * PrepareRead() to allocate resources for reading the run.
 /// * GetNext() (if there was a single run) or GetNextBatch() (when merging 
multiple runs)
 ///   to read from the run.
-/// * Once reading is done, DeleteAllBlocks() should be called to free 
resources.
+/// * Once reading is done, CloseAllPages() should be called to free resources.
 class Sorter::Run {
  public:
   Run(Sorter* parent, TupleDescriptor* sort_tuple_desc, bool initial_run);
 
   ~Run() {
-    DCHECK(fixed_len_blocks_.empty());
-    DCHECK(var_len_blocks_.empty());
-    DCHECK(var_len_copy_block_ == NULL);
+    DCHECK(fixed_len_pages_.empty());
+    DCHECK(var_len_pages_.empty());
+    DCHECK(!var_len_copy_page_.is_open());
   }
 
   /// Initialize the run for input rows by allocating the minimum number of 
required
-  /// blocks - one block for fixed-len data added to fixed_len_blocks_, one 
for the
-  /// initially unsorted var-len data added to var_len_blocks_, and one to 
copy sorted
-  /// var-len data into var_len_copy_block_.
-  Status Init();
+  /// pages - one page for fixed-len data added to fixed_len_pages_, one for 
the
+  /// initially unsorted var-len data added to var_len_pages_, and one to copy 
sorted
+  /// var-len data into var_len_copy_page_.
+  Status Init() WARN_UNUSED_RESULT;
 
   /// Add the rows from 'batch' starting at 'start_index' to the current run. 
Returns the
-  /// number of rows actually added in 'num_processed'. If the run is full (no 
more blocks
+  /// number of rows actually added in 'num_processed'. If the run is full (no 
more pages
   /// can be allocated), 'num_processed' may be less than the number of 
remaining rows in
   /// the batch. AddInputBatch() materializes the input rows using the 
expressions in
   /// sorter_->sort_tuple_expr_evals_, while AddIntermediateBatch() just 
copies rows.
-  Status AddInputBatch(RowBatch* batch, int start_index, int* num_processed) {
+  Status AddInputBatch(
+      RowBatch* batch, int start_index, int* num_processed) WARN_UNUSED_RESULT 
{
     DCHECK(initial_run_);
     if (has_var_len_slots_) {
       return AddBatchInternal<true, true>(batch, start_index, num_processed);
@@ -122,7 +233,9 @@ class Sorter::Run {
       return AddBatchInternal<false, true>(batch, start_index, num_processed);
     }
   }
-  Status AddIntermediateBatch(RowBatch* batch, int start_index, int* 
num_processed) {
+
+  Status AddIntermediateBatch(
+      RowBatch* batch, int start_index, int* num_processed) WARN_UNUSED_RESULT 
{
     DCHECK(!initial_run_);
     if (has_var_len_slots_) {
       return AddBatchInternal<true, false>(batch, start_index, num_processed);
@@ -133,53 +246,53 @@ class Sorter::Run {
 
   /// Called after the final call to Add*Batch() to do any bookkeeping 
necessary to
   /// finalize the run. Must be called before sorting or merging the run.
-  Status FinalizeInput();
+  Status FinalizeInput() WARN_UNUSED_RESULT;
 
-  /// Unpins all the blocks in a sorted run. Var-length column data is copied 
into new
-  /// blocks in sorted order. Pointers in the original tuples are converted to 
offsets
-  /// from the beginning of the sequence of var-len data blocks. Returns an 
error and
-  /// may leave some blocks pinned if an error is encountered in the block mgr.
-  Status UnpinAllBlocks();
+  /// Unpins all the pages in a sorted run. Var-length column data is copied 
into new
+  /// pages in sorted order. Pointers in the original tuples are converted to 
offsets
+  /// from the beginning of the sequence of var-len data pages. Returns an 
error and
+  /// may leave some pages pinned if an error is encountered.
+  Status UnpinAllPages() WARN_UNUSED_RESULT;
 
-  /// Deletes all blocks.
-  void DeleteAllBlocks();
+  /// Closes all pages and clears vectors of pages.
+  void CloseAllPages();
 
-  /// Prepare to read a sorted run. Pins the first block(s) in the run if the 
run was
+  /// Prepare to read a sorted run. Pins the first page(s) in the run if the 
run was
   /// previously unpinned. If the run was unpinned, try to pin the initial 
fixed and
-  /// var len blocks in the run. If it couldn't pin them, set 
pinned_all_blocks to false.
-  /// In that case, none or one of the initial blocks may be pinned and it is 
valid to
-  /// call PrepareRead() again to retry pinning the remainder. 
pinned_all_blocks is
-  /// always set to true if the run is pinned.
-  Status PrepareRead(bool* pinned_all_blocks);
+  /// var len pages in the run. If it couldn't pin them, set pinned to false.
+  /// In that case, none of the initial pages will be pinned and it is valid to
+  /// call PrepareRead() again to retry pinning. pinned is always set to
+  /// true if the run was pinned.
+  Status PrepareRead(bool* pinned) WARN_UNUSED_RESULT;
 
   /// Interface for merger - get the next batch of rows from this run. This 
run still
   /// owns the returned batch. Calls GetNext(RowBatch*, bool*).
-  Status GetNextBatch(RowBatch** sorted_batch);
+  Status GetNextBatch(RowBatch** sorted_batch) WARN_UNUSED_RESULT;
 
   /// Fill output_batch with rows from this run. If CONVERT_OFFSET_TO_PTR is 
true, offsets
   /// in var-length slots are converted back to pointers. Only row pointers 
are copied
   /// into output_batch. eos is set to true after all rows from the run are 
returned.
-  /// If eos is true, the returned output_batch has zero rows and has no 
attached blocks.
-  /// If this run was unpinned, one block (two if there are var-len slots) is 
pinned while
-  /// rows are filled into output_batch. The block is unpinned before the next 
block is
-  /// pinned, so at most one (two if there are var-len slots) block(s) will be 
pinned at
-  /// once. If the run was pinned, the blocks are not unpinned and each block 
is attached
-  /// to 'output_batch' once all rows referencing data in the block have been 
returned,
+  /// If eos is true, the returned output_batch has zero rows and has no 
attached pages.
+  /// If this run was unpinned, one page (two if there are var-len slots) is 
pinned while
+  /// rows are filled into output_batch. The page is unpinned before the next 
page is
+  /// pinned, so at most one (two if there are var-len slots) page(s) will be 
pinned at
+  /// once. If the run was pinned, the pages are not unpinned and each page is 
attached
+  /// to 'output_batch' once all rows referencing data in the page have been 
returned,
   /// either in the current batch or previous batches. In both pinned and 
unpinned cases,
-  /// all rows in output_batch will reference at most one fixed-len and one 
var-len block.
+  /// all rows in output_batch will reference at most one fixed-len and one 
var-len page.
   template <bool CONVERT_OFFSET_TO_PTR>
-  Status GetNext(RowBatch* output_batch, bool* eos);
+  Status GetNext(RowBatch* output_batch, bool* eos) WARN_UNUSED_RESULT;
 
-  /// Delete all blocks in 'runs' and clear 'runs'.
+  /// Delete all pages in 'runs' and clear 'runs'.
   static void CleanupRuns(deque<Run*>* runs) {
-    for (Run* run: *runs) {
-      run->DeleteAllBlocks();
+    for (Run* run : *runs) {
+      run->CloseAllPages();
     }
     runs->clear();
   }
 
-  /// Return total amount of fixed and var len data in run, not including 
blocks that
-  /// were already transferred.
+  /// Return total amount of fixed and var len data in run, not including 
pages that
+  /// were already transferred or closed.
   int64_t TotalBytes() const;
 
   inline bool is_pinned() const { return is_pinned_; }
@@ -196,34 +309,42 @@ class Sorter::Run {
   /// INITIAL_RUN and HAS_VAR_LEN_SLOTS are template arguments for performance 
and must
   /// match 'initial_run_' and 'has_var_len_slots_'.
   template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN>
-  Status AddBatchInternal(RowBatch* batch, int start_index, int* 
num_processed);
+  Status AddBatchInternal(
+      RowBatch* batch, int start_index, int* num_processed) WARN_UNUSED_RESULT;
 
-  /// Finalize the list of blocks: delete empty final blocks and unpin the 
previous block
+  /// Finalize the list of pages: delete empty final pages and unpin the 
previous page
   /// if the run is unpinned.
-  Status FinalizeBlocks(vector<BufferedBlockMgr::Block*>* blocks);
+  Status FinalizePages(vector<Page>* pages) WARN_UNUSED_RESULT;
 
   /// Collect the non-null var-len (e.g. STRING) slots from 'src' in 
'var_len_values' and
   /// return the total length of all var-len values in 'total_var_len'.
-  void CollectNonNullVarSlots(Tuple* src, vector<StringValue*>* var_len_values,
-      int* total_var_len);
+  void CollectNonNullVarSlots(
+      Tuple* src, vector<StringValue*>* var_len_values, int* total_var_len);
 
-  enum AddBlockMode { KEEP_PREV_PINNED, UNPIN_PREV };
+  enum AddPageMode { KEEP_PREV_PINNED, UNPIN_PREV };
 
-  /// Try to extend the current run by a block. If 'mode' is KEEP_PREV_PINNED, 
try to
-  /// allocate a new block, which may fail to extend the run due to lack of 
memory. If
-  /// mode is 'UNPIN_PREV', unpin the previous block in block_sequence before 
allocating
-  /// and adding a new block - this never fails due to lack of memory.
+  /// Try to extend the current run by a page. If 'mode' is KEEP_PREV_PINNED, 
try to
+  /// allocate a new page, which may fail to extend the run due to lack of 
memory. If
+  /// mode is 'UNPIN_PREV', unpin the previous page in page_sequence before 
allocating
+  /// and adding a new page - this never fails due to lack of memory.
   ///
-  /// Returns an error status only if the block manager returns an error. If 
no error is
+  /// Returns an error status only if the buffer pool returns an error. If no 
error is
   /// encountered, sets 'added' to indicate whether the run was extended and 
returns
-  /// Status::OK(). The new block is appended to 'block_sequence'.
-  Status TryAddBlock(AddBlockMode mode, vector<BufferedBlockMgr::Block*>* 
block_sequence,
-      bool* added);
+  /// Status::OK(). The new page is appended to 'page_sequence'.
+  Status TryAddPage(
+      AddPageMode mode, vector<Page>* page_sequence, bool* added) 
WARN_UNUSED_RESULT;
+
+  /// Adds a new page to 'page_sequence' by a page. Caller must ensure enough
+  /// reservation is available to create the page.
+  ///
+  /// Returns an error status only if the buffer pool returns an error. If an 
error
+  /// is returned 'page_sequence' is left unmodified.
+  Status AddPage(vector<Page>* page_sequence) WARN_UNUSED_RESULT;
 
-  /// Advance to the next read block. If the run is pinned, has no effect. If 
the run
-  /// is unpinned, atomically pin the block at 'block_index' + 1 in 'blocks' 
and delete
-  /// the block at 'block_index'.
-  Status PinNextReadBlock(vector<BufferedBlockMgr::Block*>* blocks, int 
block_index);
+  /// Advance to the next read page. If the run is pinned, has no effect. If 
the run
+  /// is unpinned, atomically pin the page at 'page_index' + 1 in 'pages' and 
delete
+  /// the page at 'page_index'.
+  Status PinNextReadPage(vector<Page>* pages, int page_index) 
WARN_UNUSED_RESULT;
 
   /// Copy the StringValues in 'var_values' to 'dest' in order and update the 
StringValue
   /// ptrs in 'dest' to point to the copied data.
@@ -231,25 +352,41 @@ class Sorter::Run {
 
   /// Copy the StringValues in 'var_values' to 'dest' in order. Update the 
StringValue
   /// ptrs in 'dest' to contain a packed offset for the copied data comprising
-  /// block_index and the offset relative to block_start.
-  void CopyVarLenDataConvertOffset(const vector<StringValue*>& var_values,
-      int block_index, const uint8_t* block_start, uint8_t* dest);
+  /// page_index and the offset relative to page_start.
+  void CopyVarLenDataConvertOffset(const vector<StringValue*>& var_values, int 
page_index,
+      const uint8_t* page_start, uint8_t* dest);
 
   /// Convert encoded offsets to valid pointers in tuple with layout 
'sort_tuple_desc_'.
-  /// 'tuple' is modified in-place. Returns true if the pointers refer to the 
block at
-  /// 'var_len_blocks_index_' and were successfully converted or false if the 
var len
-  /// data is in the next block, in which case 'tuple' is unmodified.
+  /// 'tuple' is modified in-place. Returns true if the pointers refer to the 
page at
+  /// 'var_len_pages_index_' and were successfully converted or false if the 
var len
+  /// data is in the next page, in which case 'tuple' is unmodified.
   bool ConvertOffsetsToPtrs(Tuple* tuple);
 
-  /// Returns true if we have var-len blocks in the run.
-  inline bool HasVarLenBlocks() const {
-    // Shouldn't have any blocks unless there are slots.
-    DCHECK(var_len_blocks_.empty() || has_var_len_slots_);
-    return !var_len_blocks_.empty();
+  /// Returns true if we have var-len pages in the run.
+  inline bool HasVarLenPages() const {
+    // Shouldn't have any pages unless there are slots.
+    DCHECK(var_len_pages_.empty() || has_var_len_slots_);
+    return !var_len_pages_.empty();
+  }
+
+  static int NumOpenPages(const vector<Page>& pages) {
+    int count = 0;
+    for (const Page& page : pages) {
+      if (page.is_open()) ++count;
+    }
+    return count;
+  }
+
+  /// Close all open pages and clear vector.
+  void DeleteAndClearPages(vector<Page>* pages) {
+    for (Page& page : *pages) {
+      if (page.is_open()) page.Close(sorter_->buffer_pool_client_);
+    }
+    pages->clear();
   }
 
   /// Parent sorter object.
-  const Sorter* sorter_;
+  Sorter* const sorter_;
 
   /// Materialized sort tuple. Input rows are materialized into 1 tuple (with 
descriptor
   /// sort_tuple_desc_) before sorting.
@@ -258,10 +395,10 @@ class Sorter::Run {
   /// The size in bytes of the sort tuple.
   const int sort_tuple_size_;
 
-  /// Number of tuples per block in a run. This gets multiplied with
-  /// TupleIterator::block_index_ in various places and to make sure we don't 
overflow the
+  /// Number of tuples per page in a run. This gets multiplied with
+  /// TupleIterator::page_index_ in various places and to make sure we don't 
overflow the
   /// result of that operation we make this int64_t here.
-  const int64_t block_capacity_;
+  const int64_t page_capacity_;
 
   const bool has_var_len_slots_;
 
@@ -269,7 +406,7 @@ class Sorter::Run {
   /// resulting from merging other runs.
   const bool initial_run_;
 
-  /// True if all blocks in the run are pinned. Initial runs start off pinned 
and
+  /// True if all pages in the run are pinned. Initial runs start off pinned 
and
   /// can be unpinned. Intermediate runs are always unpinned.
   bool is_pinned_;
 
@@ -281,27 +418,27 @@ class Sorter::Run {
   /// Always true for intermediate runs.
   bool is_sorted_;
 
-  /// Sequence of blocks in this run containing the fixed-length portion of 
the sort
+  /// Sequence of pages in this run containing the fixed-length portion of the 
sort
   /// tuples comprising this run. The data pointed to by the var-len slots are 
in
-  /// var_len_blocks_. A run can have zero blocks if no rows are appended.
-  /// If the run is sorted, the tuples in fixed_len_blocks_ will be in sorted 
order.
-  /// fixed_len_blocks_[i] is NULL iff it has been transferred or deleted.
-  vector<BufferedBlockMgr::Block*> fixed_len_blocks_;
+  /// var_len_pages_. A run can have zero pages if no rows are appended.
+  /// If the run is sorted, the tuples in fixed_len_pages_ will be in sorted 
order.
+  /// fixed_len_pages_[i] is closed iff it has been transferred or deleted.
+  vector<Page> fixed_len_pages_;
 
-  /// Sequence of blocks in this run containing the var-length data 
corresponding to the
-  /// var-length columns from fixed_len_blocks_. In intermediate runs, the 
var-len data is
+  /// Sequence of pages in this run containing the var-length data 
corresponding to the
+  /// var-length columns from fixed_len_pages_. In intermediate runs, the 
var-len data is
   /// always stored in the same order as the fixed-length tuples. In initial 
runs, the
   /// var-len data is initially in unsorted order, but is reshuffled into 
sorted order in
-  /// UnpinAllBlocks(). A run can have no var len blocks if there are no var 
len slots or
+  /// UnpinAllPages(). A run can have no var len pages if there are no var len 
slots or
   /// if all the var len data is empty or NULL.
-  /// var_len_blocks_[i] is NULL iff it has been transferred or deleted.
-  vector<BufferedBlockMgr::Block*> var_len_blocks_;
+  /// var_len_pages_[i] is closed iff it has been transferred or deleted.
+  vector<Page> var_len_pages_;
 
-  /// For initial unsorted runs, an extra pinned block is needed to reorder 
var-len data
-  /// into fixed order in UnpinAllBlocks(). 'var_len_copy_block_' stores this 
extra
-  /// block. Deleted in UnpinAllBlocks().
+  /// For initial unsorted runs, an extra pinned page is needed to reorder 
var-len data
+  /// into fixed order in UnpinAllPages(). 'var_len_copy_page_' stores this 
extra
+  /// page. Deleted in UnpinAllPages().
   /// TODO: in case of in-memory runs, this could be deleted earlier to free 
up memory.
-  BufferedBlockMgr::Block* var_len_copy_block_;
+  Page var_len_copy_page_;
 
   /// Number of tuples added so far to this run.
   int64_t num_tuples_;
@@ -313,18 +450,18 @@ class Sorter::Run {
   scoped_ptr<RowBatch> buffered_batch_;
 
   /// Members used when a run is read in GetNext().
-  /// The index into 'fixed_' and 'var_len_blocks_' of the blocks being read 
in GetNext().
-  int fixed_len_blocks_index_;
-  int var_len_blocks_index_;
+  /// The index into 'fixed_' and 'var_len_pages_' of the pages being read in 
GetNext().
+  int fixed_len_pages_index_;
+  int var_len_pages_index_;
 
   /// If true, the last call to GetNext() reached the end of the previous 
fixed or
-  /// var-len block. The next call to GetNext() must increment 
'fixed_len_blocks_index_'
-  /// or 'var_len_blocks_index_'. It must also pin the next block if the run 
is unpinned.
-  bool end_of_fixed_len_block_;
-  bool end_of_var_len_block_;
+  /// var-len page. The next call to GetNext() must increment 
'fixed_len_pages_index_'
+  /// or 'var_len_pages_index_'. It must also pin the next page if the run is 
unpinned.
+  bool end_of_fixed_len_page_;
+  bool end_of_var_len_page_;
 
-  /// Offset into the current fixed length data block being processed.
-  int fixed_len_block_offset_;
+  /// Offset into the current fixed length data page being processed.
+  int fixed_len_page_offset_;
 };
 
 /// Helper class used to iterate over tuples in a run during sorting.
@@ -340,7 +477,7 @@ class Sorter::TupleIterator {
   /// Default constructor used for local variable. Produces invalid iterator 
that must
   /// be assigned before use.
   TupleIterator() : index_(-1), tuple_(NULL), buffer_start_index_(-1),
-      buffer_end_index_(-1), block_index_(-1) { }
+      buffer_end_index_(-1), page_index_(-1) { }
 
   /// Create an iterator pointing to the first tuple in the run.
   static inline TupleIterator Begin(Sorter::Run* run) { return 
TupleIterator(run, 0); }
@@ -351,8 +488,8 @@ class Sorter::TupleIterator {
   }
 
   /// Increments 'index_' and sets 'tuple_' to point to the next tuple in the 
run.
-  /// Increments 'block_index_' and advances to the next block if the next 
tuple is in
-  /// the next block. Can be advanced one past the last tuple in the run, but 
is not
+  /// Increments 'page_index_' and advances to the next page if the next tuple 
is in
+  /// the next page. Can be advanced one past the last tuple in the run, but 
is not
   /// valid to dereference 'tuple_' in that case. 'run' and 'tuple_size' are 
passed as
   /// arguments to avoid redundantly storing the same values in multiple 
iterators in
   /// perf-critical algorithms.
@@ -370,13 +507,13 @@ class Sorter::TupleIterator {
   }
 
  private:
-  // Move to the next block in the run (or do nothing if at end of run).
+  // Move to the next page in the run (or do nothing if at end of run).
   // This is the slow path for Next();
-  void NextBlock(Sorter::Run* run, int tuple_size);
+  void NextPage(Sorter::Run* run, int tuple_size);
 
-  // Move to the previous block in the run (or do nothing if at beginning of 
run).
+  // Move to the previous page in the run (or do nothing if at beginning of 
run).
   // This is the slow path for Prev();
-  void PrevBlock(Sorter::Run* run, int tuple_size);
+  void PrevPage(Sorter::Run* run, int tuple_size);
 
   /// Index of the current tuple in the run.
   /// Can be -1 or run->num_rows() if Next() or Prev() moves iterator outside 
of run.
@@ -387,15 +524,15 @@ class Sorter::TupleIterator {
   /// iterator outside of run.
   uint8_t* tuple_;
 
-  /// Indices of start and end tuples of block at block_index_. I.e. the 
current block
+  /// Indices of start and end tuples of page at page_index_. I.e. the current 
page
   /// has tuples with indices in range [buffer_start_index_, buffer_end_index).
   int64_t buffer_start_index_;
   int64_t buffer_end_index_;
 
-  /// Index into fixed_len_blocks_ of the block containing the current tuple.
-  /// If index_ is negative or past end of run, will point to the first or 
last block
+  /// Index into fixed_len_pages_ of the page containing the current tuple.
+  /// If index_ is negative or past end of run, will point to the first or 
last page
   /// in run respectively.
-  int block_index_;
+  int page_index_;
 };
 
 /// Sorts a sequence of tuples from a run in place using a provided tuple 
comparator.
@@ -404,16 +541,16 @@ class Sorter::TupleIterator {
 /// instance to check for cancellation during an in-memory sort.
 class Sorter::TupleSorter {
  public:
-  TupleSorter(const TupleRowComparator& comparator, int64_t block_size,
-      int tuple_size, RuntimeState* state);
+  TupleSorter(const TupleRowComparator& comparator, int64_t page_size, int 
tuple_size,
+      RuntimeState* state);
 
   ~TupleSorter();
 
   /// Performs a quicksort for tuples in 'run' followed by an insertion sort to
-  /// finish smaller blocks. Only valid to call if this is an initial run that 
has not
+  /// finish smaller ranges. Only valid to call if this is an initial run that 
has not
   /// yet been sorted. Returns an error status if any error is encountered or 
if the
   /// query is cancelled.
-  Status Sort(Run* run);
+  Status Sort(Run* run) WARN_UNUSED_RESULT;
 
  private:
   static const int INSERTION_THRESHOLD = 16;
@@ -451,7 +588,8 @@ class Sorter::TupleSorter {
 
   /// Perform an insertion sort for rows in the range [begin, end) in a run.
   /// Only valid to call for ranges of size at least 1.
-  Status InsertionSort(const TupleIterator& begin, const TupleIterator& end);
+  Status InsertionSort(
+      const TupleIterator& begin, const TupleIterator& end) WARN_UNUSED_RESULT;
 
   /// Partitions the sequence of tuples in the range [begin, end) in a run 
into two
   /// groups around the pivot tuple - i.e. tuples in first group are <= the 
pivot, and
@@ -459,12 +597,12 @@ class Sorter::TupleSorter {
   /// groups and the index to the first element in the second group is 
returned in 'cut'.
   /// Return an error status if any error is encountered or if the query is 
cancelled.
   Status Partition(TupleIterator begin, TupleIterator end, const Tuple* pivot,
-      TupleIterator* cut);
+      TupleIterator* cut) WARN_UNUSED_RESULT;
 
   /// Performs a quicksort of rows in the range [begin, end) followed by 
insertion sort
   /// for smaller groups of elements. Return an error status for any errors or 
if the
   /// query is cancelled.
-  Status SortHelper(TupleIterator begin, TupleIterator end);
+  Status SortHelper(TupleIterator begin, TupleIterator end) WARN_UNUSED_RESULT;
 
   /// Select a pivot to partition [begin, end).
   Tuple* SelectPivot(TupleIterator begin, TupleIterator end);
@@ -477,45 +615,33 @@ class Sorter::TupleSorter {
 };
 
 // Sorter::Run methods
-Sorter::Run::Run(Sorter* parent, TupleDescriptor* sort_tuple_desc,
-    bool initial_run)
+Sorter::Run::Run(Sorter* parent, TupleDescriptor* sort_tuple_desc, bool 
initial_run)
   : sorter_(parent),
     sort_tuple_desc_(sort_tuple_desc),
     sort_tuple_size_(sort_tuple_desc->byte_size()),
-    block_capacity_(parent->block_mgr_->max_block_size() / sort_tuple_size_),
+    page_capacity_(parent->page_len_ / sort_tuple_size_),
     has_var_len_slots_(sort_tuple_desc->HasVarlenSlots()),
     initial_run_(initial_run),
     is_pinned_(initial_run),
     is_finalized_(false),
     is_sorted_(!initial_run),
-    var_len_copy_block_(NULL),
-    num_tuples_(0) { }
+    num_tuples_(0) {}
 
 Status Sorter::Run::Init() {
-  BufferedBlockMgr::Block* block = NULL;
-  RETURN_IF_ERROR(
-      sorter_->block_mgr_->GetNewBlock(sorter_->block_mgr_client_, NULL, 
&block));
-  if (block == NULL) {
-    return sorter_->mem_tracker_->MemLimitExceeded(
-        sorter_->state_, Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "fixed"));
-  }
-  fixed_len_blocks_.push_back(block);
+  int num_to_create = 1 + has_var_len_slots_ + (has_var_len_slots_ && 
initial_run_);
+  int64_t required_mem = num_to_create * sorter_->page_len_;
+  if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(required_mem)) {
+    return Status(Substitute(
+        "Unexpected error trying to reserve $0 bytes for a sorted run: $2",
+        required_mem, sorter_->buffer_pool_client_->DebugString()));
+  }
+
+  RETURN_IF_ERROR(AddPage(&fixed_len_pages_));
   if (has_var_len_slots_) {
-    RETURN_IF_ERROR(
-        sorter_->block_mgr_->GetNewBlock(sorter_->block_mgr_client_, NULL, 
&block));
-    if (block == NULL) {
-      return sorter_->mem_tracker_->MemLimitExceeded(
-          sorter_->state_, Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "variable"));
-    }
-    var_len_blocks_.push_back(block);
+    RETURN_IF_ERROR(AddPage(&var_len_pages_));
     if (initial_run_) {
-      // Need additional var len block to reorder var len data in 
UnpinAllBlocks().
-      RETURN_IF_ERROR(sorter_->block_mgr_->GetNewBlock(
-          sorter_->block_mgr_client_, NULL, &var_len_copy_block_));
-      if (var_len_copy_block_ == NULL) {
-        return sorter_->mem_tracker_->MemLimitExceeded(
-            sorter_->state_, Substitute(MEM_ALLOC_FAILED_ERROR_MSG, 
"variable"));
-      }
+      // Need additional var len page to reorder var len data in 
UnpinAllPages().
+      RETURN_IF_ERROR(var_len_copy_page_.Init(sorter_));
     }
   }
   if (initial_run_) {
@@ -527,14 +653,15 @@ Status Sorter::Run::Init() {
 }
 
 template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN>
-Status Sorter::Run::AddBatchInternal(RowBatch* batch, int start_index, int* 
num_processed) {
+Status Sorter::Run::AddBatchInternal(
+    RowBatch* batch, int start_index, int* num_processed) {
   DCHECK(!is_finalized_);
-  DCHECK(!fixed_len_blocks_.empty());
+  DCHECK(!fixed_len_pages_.empty());
   DCHECK_EQ(HAS_VAR_LEN_SLOTS, has_var_len_slots_);
   DCHECK_EQ(INITIAL_RUN, initial_run_);
 
   *num_processed = 0;
-  BufferedBlockMgr::Block* cur_fixed_len_block = fixed_len_blocks_.back();
+  Page* cur_fixed_len_page = &fixed_len_pages_.back();
 
   if (!INITIAL_RUN) {
     // For intermediate merges, the input row is the sort tuple.
@@ -543,13 +670,13 @@ Status Sorter::Run::AddBatchInternal(RowBatch* batch, int 
start_index, int* num_
   }
 
   /// Keep initial unsorted runs pinned in memory so we can sort them.
-  const AddBlockMode add_mode = INITIAL_RUN ? KEEP_PREV_PINNED : UNPIN_PREV;
+  const AddPageMode add_mode = INITIAL_RUN ? KEEP_PREV_PINNED : UNPIN_PREV;
 
-  // Input rows are copied/materialized into tuples allocated in 
fixed_len_blocks_.
-  // The variable length column data are copied into blocks stored in 
var_len_blocks_.
+  // Input rows are copied/materialized into tuples allocated in 
fixed_len_pages_.
+  // The variable length column data are copied into pages stored in 
var_len_pages_.
   // Input row processing is split into two loops.
-  // The inner loop processes as many input rows as will fit in 
cur_fixed_len_block.
-  // The outer loop allocates a new block for fixed-len data if the input 
batch is
+  // The inner loop processes as many input rows as will fit in 
cur_fixed_len_page.
+  // The outer loop allocates a new page for fixed-len data if the input batch 
is
   // not exhausted.
 
   // cur_input_index is the index into the input 'batch' of the current input 
row being
@@ -559,22 +686,23 @@ Status Sorter::Run::AddBatchInternal(RowBatch* batch, int 
start_index, int* num_
   string_values.reserve(sort_tuple_desc_->string_slots().size());
   while (cur_input_index < batch->num_rows()) {
     // tuples_remaining is the number of tuples to copy/materialize into
-    // cur_fixed_len_block.
-    int tuples_remaining = cur_fixed_len_block->BytesRemaining() / 
sort_tuple_size_;
+    // cur_fixed_len_page.
+    int tuples_remaining = cur_fixed_len_page->BytesRemaining() / 
sort_tuple_size_;
     tuples_remaining = min(batch->num_rows() - cur_input_index, 
tuples_remaining);
 
     for (int i = 0; i < tuples_remaining; ++i) {
       int total_var_len = 0;
       TupleRow* input_row = batch->GetRow(cur_input_index);
-      Tuple* new_tuple = 
cur_fixed_len_block->Allocate<Tuple>(sort_tuple_size_);
+      Tuple* new_tuple =
+          
reinterpret_cast<Tuple*>(cur_fixed_len_page->AllocateBytes(sort_tuple_size_));
       if (INITIAL_RUN) {
         new_tuple->MaterializeExprs<HAS_VAR_LEN_SLOTS, true>(input_row,
             *sort_tuple_desc_, sorter_->sort_tuple_expr_evals_, NULL,
             &string_values, &total_var_len);
-        if (total_var_len > sorter_->block_mgr_->max_block_size()) {
-          return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, Substitute(
-              "Variable length data in a single tuple larger than block size 
$0 > $1",
-              total_var_len, sorter_->block_mgr_->max_block_size())));
+        if (total_var_len > sorter_->page_len_) {
+          return Status(TErrorCode::MAX_ROW_SIZE,
+              PrettyPrinter::Print(total_var_len, TUnit::BYTES), 
sorter_->node_id_,
+              PrettyPrinter::Print(0, TUnit::BYTES));
         }
       } else {
         memcpy(new_tuple, input_row->GetTuple(0), sort_tuple_size_);
@@ -584,17 +712,17 @@ Status Sorter::Run::AddBatchInternal(RowBatch* batch, int 
start_index, int* num_
       }
 
       if (HAS_VAR_LEN_SLOTS) {
-        DCHECK_GT(var_len_blocks_.size(), 0);
-        BufferedBlockMgr::Block* cur_var_len_block = var_len_blocks_.back();
-        if (cur_var_len_block->BytesRemaining() < total_var_len) {
+        DCHECK_GT(var_len_pages_.size(), 0);
+        Page* cur_var_len_page = &var_len_pages_.back();
+        if (cur_var_len_page->BytesRemaining() < total_var_len) {
           bool added;
-          RETURN_IF_ERROR(TryAddBlock(add_mode, &var_len_blocks_, &added));
+          RETURN_IF_ERROR(TryAddPage(add_mode, &var_len_pages_, &added));
           if (added) {
-            cur_var_len_block = var_len_blocks_.back();
+            cur_var_len_page = &var_len_pages_.back();
           } else {
-            // There was not enough space in the last var-len block for this 
tuple, and
+            // There was not enough space in the last var-len page for this 
tuple, and
             // the run could not be extended. Return the fixed-len allocation 
and exit.
-            cur_fixed_len_block->ReturnAllocation(sort_tuple_size_);
+            cur_fixed_len_page->FreeBytes(sort_tuple_size_);
             return Status::OK();
           }
         }
@@ -605,13 +733,13 @@ Status Sorter::Run::AddBatchInternal(RowBatch* batch, int 
start_index, int* num_
           DCHECK(new_tuple->IsNull(coll_slot->null_indicator_offset()));
         }
 
-        uint8_t* var_data_ptr = 
cur_var_len_block->Allocate<uint8_t>(total_var_len);
+        uint8_t* var_data_ptr = cur_var_len_page->AllocateBytes(total_var_len);
         if (INITIAL_RUN) {
           CopyVarLenData(string_values, var_data_ptr);
         } else {
-          DCHECK_EQ(var_len_blocks_.back(), cur_var_len_block);
-          CopyVarLenDataConvertOffset(string_values, var_len_blocks_.size() - 
1,
-              reinterpret_cast<uint8_t*>(cur_var_len_block->buffer()), 
var_data_ptr);
+          DCHECK_EQ(&var_len_pages_.back(), cur_var_len_page);
+          CopyVarLenDataConvertOffset(string_values, var_len_pages_.size() - 1,
+              cur_var_len_page->data(), var_data_ptr);
         }
       }
       ++num_tuples_;
@@ -619,13 +747,13 @@ Status Sorter::Run::AddBatchInternal(RowBatch* batch, int 
start_index, int* num_
       ++cur_input_index;
     }
 
-    // If there are still rows left to process, get a new block for the 
fixed-length
+    // If there are still rows left to process, get a new page for the 
fixed-length
     // tuples. If the run is already too long, return.
     if (cur_input_index < batch->num_rows()) {
       bool added;
-      RETURN_IF_ERROR(TryAddBlock(add_mode, &fixed_len_blocks_, &added));
+      RETURN_IF_ERROR(TryAddPage(add_mode, &fixed_len_pages_, &added));
       if (!added) return Status::OK();
-      cur_fixed_len_block = fixed_len_blocks_.back();
+      cur_fixed_len_page = &fixed_len_pages_.back();
     }
   }
   return Status::OK();
@@ -634,158 +762,146 @@ Status Sorter::Run::AddBatchInternal(RowBatch* batch, 
int start_index, int* num_
 Status Sorter::Run::FinalizeInput() {
   DCHECK(!is_finalized_);
 
-  RETURN_IF_ERROR(FinalizeBlocks(&fixed_len_blocks_));
+  RETURN_IF_ERROR(FinalizePages(&fixed_len_pages_));
   if (has_var_len_slots_) {
-    RETURN_IF_ERROR(FinalizeBlocks(&var_len_blocks_));
+    RETURN_IF_ERROR(FinalizePages(&var_len_pages_));
   }
   is_finalized_ = true;
   return Status::OK();
 }
 
-Status Sorter::Run::FinalizeBlocks(vector<BufferedBlockMgr::Block*>* blocks) {
-  DCHECK_GT(blocks->size(), 0);
-  BufferedBlockMgr::Block* last_block = blocks->back();
-  if (last_block->valid_data_len() > 0) {
+Status Sorter::Run::FinalizePages(vector<Page>* pages) {
+  DCHECK_GT(pages->size(), 0);
+  Page* last_page = &pages->back();
+  if (last_page->valid_data_len() > 0) {
     DCHECK_EQ(initial_run_, is_pinned_);
     if (!is_pinned_) {
-      // Unpin the last block of this unpinned run. We've finished writing the 
run so
-      // all blocks in the run can now be unpinned.
-      RETURN_IF_ERROR(last_block->Unpin());
+      // Unpin the last page of this unpinned run. We've finished writing the 
run so
+      // all pages in the run can now be unpinned.
+      last_page->Unpin(sorter_->buffer_pool_client_);
     }
   } else {
-    last_block->Delete();
-    blocks->pop_back();
+    last_page->Close(sorter_->buffer_pool_client_);
+    pages->pop_back();
   }
   return Status::OK();
 }
 
-void Sorter::Run::DeleteAllBlocks() {
-  DeleteAndClearBlocks(&fixed_len_blocks_);
-  DeleteAndClearBlocks(&var_len_blocks_);
-  if (var_len_copy_block_ != NULL) var_len_copy_block_->Delete();
-  var_len_copy_block_ = NULL;
+void Sorter::Run::CloseAllPages() {
+  DeleteAndClearPages(&fixed_len_pages_);
+  DeleteAndClearPages(&var_len_pages_);
+  if (var_len_copy_page_.is_open()) {
+    var_len_copy_page_.Close(sorter_->buffer_pool_client_);
+  }
 }
 
-Status Sorter::Run::UnpinAllBlocks() {
+Status Sorter::Run::UnpinAllPages() {
   DCHECK(is_sorted_);
   DCHECK(initial_run_);
   DCHECK(is_pinned_);
   DCHECK(is_finalized_);
-  // A list of var len blocks to replace 'var_len_blocks_'. Note that after we 
are done
-  // we may have a different number of blocks, because internal fragmentation 
may leave
-  // slightly different amounts of wasted space at the end of each block.
-  // We need to be careful to clean up these blocks if we run into an error in 
this method.
-  vector<BufferedBlockMgr::Block*> sorted_var_len_blocks;
-  sorted_var_len_blocks.reserve(var_len_blocks_.size());
+  // A list of var len pages to replace 'var_len_pages_'. Note that after we 
are done
+  // we may have a different number of pages, because internal fragmentation 
may leave
+  // slightly different amounts of wasted space at the end of each page.
+  // We need to be careful to clean up these pages if we run into an error in 
this method.
+  vector<Page> sorted_var_len_pages;
+  sorted_var_len_pages.reserve(var_len_pages_.size());
 
   vector<StringValue*> string_values;
   int total_var_len;
   string_values.reserve(sort_tuple_desc_->string_slots().size());
-  BufferedBlockMgr::Block* cur_sorted_var_len_block = NULL;
-  if (HasVarLenBlocks()) {
-    DCHECK(var_len_copy_block_ != NULL);
-    sorted_var_len_blocks.push_back(var_len_copy_block_);
-    cur_sorted_var_len_block = var_len_copy_block_;
-    // Set var_len_copy_block_ to NULL since it was moved to var_len_blocks_.
-    var_len_copy_block_ = NULL;
+  Page* cur_sorted_var_len_page = NULL;
+  if (HasVarLenPages()) {
+    DCHECK(var_len_copy_page_.is_open());
+    sorted_var_len_pages.push_back(move(var_len_copy_page_));
+    cur_sorted_var_len_page = &sorted_var_len_pages.back();
   } else if (has_var_len_slots_) {
-    // If we don't have any var-len blocks, clean up the copy block.
-    DCHECK(var_len_copy_block_ != NULL);
-    var_len_copy_block_->Delete();
-    var_len_copy_block_ = NULL;
+    // If we don't have any var-len pages, clean up the copy page.
+    DCHECK(var_len_copy_page_.is_open());
+    var_len_copy_page_.Close(sorter_->buffer_pool_client_);
   } else {
-    DCHECK(var_len_copy_block_ == NULL);
+    DCHECK(!var_len_copy_page_.is_open());
   }
 
   Status status;
-  for (int i = 0; i < fixed_len_blocks_.size(); ++i) {
-    BufferedBlockMgr::Block* cur_fixed_block = fixed_len_blocks_[i];
+  for (int i = 0; i < fixed_len_pages_.size(); ++i) {
+    Page* cur_fixed_page = &fixed_len_pages_[i];
     // Skip converting the pointers if no var-len slots, or if all the values 
are null
     // or zero-length. This will possibly leave zero-length pointers pointing 
to
     // arbitrary memory, but zero-length data cannot be dereferenced anyway.
-    if (HasVarLenBlocks()) {
-      for (int block_offset = 0; block_offset < 
cur_fixed_block->valid_data_len();
-          block_offset += sort_tuple_size_) {
-        Tuple* cur_tuple =
-            reinterpret_cast<Tuple*>(cur_fixed_block->buffer() + block_offset);
+    if (HasVarLenPages()) {
+      for (int page_offset = 0; page_offset < cur_fixed_page->valid_data_len();
+           page_offset += sort_tuple_size_) {
+        Tuple* cur_tuple = reinterpret_cast<Tuple*>(cur_fixed_page->data() + 
page_offset);
         CollectNonNullVarSlots(cur_tuple, &string_values, &total_var_len);
-        DCHECK(cur_sorted_var_len_block != NULL);
-        if (cur_sorted_var_len_block->BytesRemaining() < total_var_len) {
+        DCHECK(cur_sorted_var_len_page->is_open());
+        if (cur_sorted_var_len_page->BytesRemaining() < total_var_len) {
           bool added;
-          status = TryAddBlock(UNPIN_PREV, &sorted_var_len_blocks, &added);
-          if (!status.ok()) goto cleanup_blocks;
-          DCHECK(added) << "TryAddBlock() with UNPIN_PREV should not fail to 
add";
-          cur_sorted_var_len_block = sorted_var_len_blocks.back();
+          status = TryAddPage(UNPIN_PREV, &sorted_var_len_pages, &added);
+          if (!status.ok()) goto cleanup_pages;
+          DCHECK(added) << "TryAddPage() with UNPIN_PREV should not fail to 
add";
+          cur_sorted_var_len_page = &sorted_var_len_pages.back();
         }
-        uint8_t* var_data_ptr =
-            cur_sorted_var_len_block->Allocate<uint8_t>(total_var_len);
-        DCHECK_EQ(sorted_var_len_blocks.back(), cur_sorted_var_len_block);
-        CopyVarLenDataConvertOffset(string_values, 
sorted_var_len_blocks.size() - 1,
-            reinterpret_cast<uint8_t*>(cur_sorted_var_len_block->buffer()), 
var_data_ptr);
+        uint8_t* var_data_ptr = 
cur_sorted_var_len_page->AllocateBytes(total_var_len);
+        DCHECK_EQ(&sorted_var_len_pages.back(), cur_sorted_var_len_page);
+        CopyVarLenDataConvertOffset(string_values, sorted_var_len_pages.size() 
- 1,
+            cur_sorted_var_len_page->data(), var_data_ptr);
       }
     }
-    status = cur_fixed_block->Unpin();
-    if (!status.ok()) goto cleanup_blocks;
+    cur_fixed_page->Unpin(sorter_->buffer_pool_client_);
   }
 
-  if (HasVarLenBlocks()) {
-    DCHECK_GT(sorted_var_len_blocks.back()->valid_data_len(), 0);
-    status = sorted_var_len_blocks.back()->Unpin();
-    if (!status.ok()) goto cleanup_blocks;
+  if (HasVarLenPages()) {
+    DCHECK_GT(sorted_var_len_pages.back().valid_data_len(), 0);
+    sorted_var_len_pages.back().Unpin(sorter_->buffer_pool_client_);
   }
 
-  // Clear var_len_blocks_ and replace with it with the contents of 
sorted_var_len_blocks
-  DeleteAndClearBlocks(&var_len_blocks_);
-  sorted_var_len_blocks.swap(var_len_blocks_);
+  // Clear var_len_pages_ and replace with it with the contents of 
sorted_var_len_pages
+  DeleteAndClearPages(&var_len_pages_);
+  sorted_var_len_pages.swap(var_len_pages_);
   is_pinned_ = false;
   sorter_->spilled_runs_counter_->Add(1);
   return Status::OK();
 
-cleanup_blocks:
-  DeleteAndClearBlocks(&sorted_var_len_blocks);
+cleanup_pages:
+  DeleteAndClearPages(&sorted_var_len_pages);
   return status;
 }
 
-Status Sorter::Run::PrepareRead(bool* pinned_all_blocks) {
+Status Sorter::Run::PrepareRead(bool* pinned) {
   DCHECK(is_finalized_);
   DCHECK(is_sorted_);
 
-  fixed_len_blocks_index_ = 0;
-  fixed_len_block_offset_ = 0;
-  var_len_blocks_index_ = 0;
-  end_of_fixed_len_block_ = end_of_var_len_block_ = fixed_len_blocks_.empty();
+  fixed_len_pages_index_ = 0;
+  fixed_len_page_offset_ = 0;
+  var_len_pages_index_ = 0;
+  end_of_fixed_len_page_ = end_of_var_len_page_ = fixed_len_pages_.empty();
   num_tuples_returned_ = 0;
 
   buffered_batch_.reset(new RowBatch(
       sorter_->output_row_desc_, sorter_->state_->batch_size(), 
sorter_->mem_tracker_));
 
-  // If the run is pinned, all blocks are already pinned, so we're ready to 
read.
+  // If the run is pinned, all pages are already pinned, so we're ready to 
read.
   if (is_pinned_) {
-    *pinned_all_blocks = true;
+    *pinned = true;
     return Status::OK();
   }
 
-  // Attempt to pin the first fixed and var-length blocks. In either case, 
pinning may
-  // fail if the number of reserved blocks is oversubscribed, see IMPALA-1590.
-  if (fixed_len_blocks_.size() > 0) {
-    bool pinned;
-    RETURN_IF_ERROR(fixed_len_blocks_[0]->Pin(&pinned));
-    if (!pinned) {
-      *pinned_all_blocks = false;
-      return Status::OK();
-    }
+  int num_to_pin = (fixed_len_pages_.size() > 0 ? 1 : 0) + (HasVarLenPages() ? 
1 : 0);
+  int64_t required_mem = num_to_pin * sorter_->page_len_;
+  if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(required_mem)) {
+    *pinned = false;
+    return Status::OK();
   }
 
-  if (HasVarLenBlocks()) {
-    bool pinned;
-    RETURN_IF_ERROR(var_len_blocks_[0]->Pin(&pinned));
-    if (!pinned) {
-      *pinned_all_blocks = false;
-      return Status::OK();
-    }
+  // Attempt to pin the first fixed and var-length pages.
+  if (fixed_len_pages_.size() > 0) {
+    RETURN_IF_ERROR(fixed_len_pages_[0].Pin(sorter_->buffer_pool_client_));
   }
-
-  *pinned_all_blocks = true;
+  if (HasVarLenPages()) {
+    RETURN_IF_ERROR(var_len_pages_[0].Pin(sorter_->buffer_pool_client_));
+  }
+  *pinned = true;
   return Status::OK();
 }
 
@@ -794,7 +910,7 @@ Status Sorter::Run::GetNextBatch(RowBatch** output_batch) {
   buffered_batch_->Reset();
   // Fill more rows into buffered_batch_.
   bool eos;
-  if (HasVarLenBlocks() && !is_pinned_) {
+  if (HasVarLenPages() && !is_pinned_) {
     RETURN_IF_ERROR(GetNext<true>(buffered_batch_.get(), &eos));
   } else {
     RETURN_IF_ERROR(GetNext<false>(buffered_batch_.get(), &eos));
@@ -804,7 +920,7 @@ Status Sorter::Run::GetNextBatch(RowBatch** output_batch) {
     // Setting output_batch to NULL signals eos to the caller, so GetNext() is 
not
     // allowed to attach resources to the batch on eos.
     DCHECK_EQ(buffered_batch_->num_rows(), 0);
-    DCHECK_EQ(buffered_batch_->num_blocks(), 0);
+    DCHECK_EQ(buffered_batch_->num_buffers(), 0);
     *output_batch = NULL;
     return Status::OK();
   }
@@ -815,122 +931,130 @@ Status Sorter::Run::GetNextBatch(RowBatch** 
output_batch) {
 template <bool CONVERT_OFFSET_TO_PTR>
 Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
   // Var-len offsets are converted only when reading var-len data from 
unpinned runs.
-  // We shouldn't convert var len offsets if there are no blocks, since in 
that case
-  // they must all be null or zero-length strings, which don't point into a 
valid block.
-  DCHECK_EQ(CONVERT_OFFSET_TO_PTR, HasVarLenBlocks() && !is_pinned_);
+  // We shouldn't convert var len offsets if there are no pages, since in that 
case
+  // they must all be null or zero-length strings, which don't point into a 
valid page.
+  DCHECK_EQ(CONVERT_OFFSET_TO_PTR, HasVarLenPages() && !is_pinned_);
 
-  if (end_of_fixed_len_block_ &&
-      fixed_len_blocks_index_ >= static_cast<int>(fixed_len_blocks_.size()) - 
1) {
+  if (end_of_fixed_len_page_
+      && fixed_len_pages_index_ >= static_cast<int>(fixed_len_pages_.size()) - 
1) {
     if (is_pinned_) {
-      // All blocks were previously attached to output batches. GetNextBatch() 
assumes
+      // All pages were previously attached to output batches. GetNextBatch() 
assumes
       // that we don't attach resources to the batch on eos.
-      DCHECK_EQ(NumNonNullBlocks(fixed_len_blocks_), 0);
-      DCHECK_EQ(NumNonNullBlocks(var_len_blocks_), 0);
+      DCHECK_EQ(NumOpenPages(fixed_len_pages_), 0);
+      DCHECK_EQ(NumOpenPages(var_len_pages_), 0);
 
-      // Flush resources in case we are in a subplan and need to allocate more 
blocks
+      // Flush resources in case we are in a subplan and need to allocate more 
pages
       // when the node is reopened.
       output_batch->MarkFlushResources();
     } else {
       // We held onto the last fixed or var len blocks without transferring 
them to the
       // caller. We signalled MarkNeedsDeepCopy() to the caller, so we can 
safely delete
       // them now to free memory.
-      if (!fixed_len_blocks_.empty()) 
DCHECK_EQ(NumNonNullBlocks(fixed_len_blocks_), 1);
-      if (!var_len_blocks_.empty()) 
DCHECK_EQ(NumNonNullBlocks(var_len_blocks_), 1);
+      if (!fixed_len_pages_.empty()) DCHECK_EQ(NumOpenPages(fixed_len_pages_), 
1);
+      if (!var_len_pages_.empty()) DCHECK_EQ(NumOpenPages(var_len_pages_), 1);
     }
-    DeleteAllBlocks();
+    CloseAllPages();
     *eos = true;
     DCHECK_EQ(num_tuples_returned_, num_tuples_);
     return Status::OK();
   }
 
-  // Advance the fixed or var len block if we reached the end in the previous 
call to
+  // Advance the fixed or var len page if we reached the end in the previous 
call to
   // GetNext().
-  if (end_of_fixed_len_block_) {
-    RETURN_IF_ERROR(PinNextReadBlock(&fixed_len_blocks_, 
fixed_len_blocks_index_));
-    ++fixed_len_blocks_index_;
-    fixed_len_block_offset_ = 0;
-    end_of_fixed_len_block_ = false;
-  }
-  if (end_of_var_len_block_) {
-    RETURN_IF_ERROR(PinNextReadBlock(&var_len_blocks_, var_len_blocks_index_));
-    ++var_len_blocks_index_;
-    end_of_var_len_block_ = false;
-  }
-
-  // Fills rows into the output batch until a block boundary is reached.
-  BufferedBlockMgr::Block* fixed_len_block = 
fixed_len_blocks_[fixed_len_blocks_index_];
-  DCHECK(fixed_len_block != NULL);
-  while (!output_batch->AtCapacity() &&
-      fixed_len_block_offset_ < fixed_len_block->valid_data_len()) {
-    DCHECK(fixed_len_block != NULL);
-    Tuple* input_tuple = reinterpret_cast<Tuple*>(
-        fixed_len_block->buffer() + fixed_len_block_offset_);
+  if (end_of_fixed_len_page_) {
+    RETURN_IF_ERROR(PinNextReadPage(&fixed_len_pages_, 
fixed_len_pages_index_));
+    ++fixed_len_pages_index_;
+    fixed_len_page_offset_ = 0;
+    end_of_fixed_len_page_ = false;
+  }
+  if (end_of_var_len_page_) {
+    RETURN_IF_ERROR(PinNextReadPage(&var_len_pages_, var_len_pages_index_));
+    ++var_len_pages_index_;
+    end_of_var_len_page_ = false;
+  }
+
+  // Fills rows into the output batch until a page boundary is reached.
+  Page* fixed_len_page = &fixed_len_pages_[fixed_len_pages_index_];
+  DCHECK(fixed_len_page != NULL);
+
+  // Ensure we have a reference to the fixed-length page's buffer.
+  RETURN_IF_ERROR(fixed_len_page->WaitForBuffer());
+
+  // If we're converting offsets into unpinned var-len pages, make sure the
+  // current var-len page is in memory.
+  if (CONVERT_OFFSET_TO_PTR && HasVarLenPages()) {
+    RETURN_IF_ERROR(var_len_pages_[var_len_pages_index_].WaitForBuffer());
+  }
+
+  while (!output_batch->AtCapacity()
+      && fixed_len_page_offset_ < fixed_len_page->valid_data_len()) {
+    DCHECK(fixed_len_page != NULL);
+    Tuple* input_tuple =
+        reinterpret_cast<Tuple*>(fixed_len_page->data() + 
fixed_len_page_offset_);
 
     if (CONVERT_OFFSET_TO_PTR && !ConvertOffsetsToPtrs(input_tuple)) {
       DCHECK(!is_pinned_);
-      // The var-len data is in the next block. We are done with the current 
block, so
-      // return rows we've accumulated so far and advance to the next block in 
the next
-      // GetNext() call. This is needed for the unpinned case where we need to 
exchange
-      // this block for the next in the next GetNext() call. So therefore we 
must hold
-      // onto the current var-len block and signal to the caller that the 
block is going
+      // The var-len data is in the next page. We are done with the current 
page, so
+      // return rows we've accumulated so far and advance to the next page in 
the next
+      // GetNext() call. This is needed for the unpinned case where we will 
exchange
+      // this page for the next in the next GetNext() call. So therefore we 
must hold
+      // onto the current var-len page and signal to the caller that the page 
is going
       // to be deleted.
       output_batch->MarkNeedsDeepCopy();
-      end_of_var_len_block_ = true;
+      end_of_var_len_page_ = true;
       break;
     }
     output_batch->GetRow(output_batch->AddRow())->SetTuple(0, input_tuple);
     output_batch->CommitLastRow();
-    fixed_len_block_offset_ += sort_tuple_size_;
+    fixed_len_page_offset_ += sort_tuple_size_;
     ++num_tuples_returned_;
   }
 
-  if (fixed_len_block_offset_ >= fixed_len_block->valid_data_len()) {
-    // Reached the block boundary, need to move to the next block.
+  if (fixed_len_page_offset_ >= fixed_len_page->valid_data_len()) {
+    // Reached the page boundary, need to move to the next page.
     if (is_pinned_) {
-      // Attach block to batch. Caller can delete the block when it wants to.
-      output_batch->AddBlock(fixed_len_blocks_[fixed_len_blocks_index_],
+      BufferPool::ClientHandle* client = sorter_->buffer_pool_client_;
+      // Attach page to batch. Caller can delete the page when it wants to.
+      output_batch->AddBuffer(client,
+          fixed_len_pages_[fixed_len_pages_index_].ExtractBuffer(client),
           RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-      fixed_len_blocks_[fixed_len_blocks_index_] = NULL;
 
-      // Attach the var-len blocks at eos once no more rows will reference the 
blocks.
-      if (fixed_len_blocks_index_ == fixed_len_blocks_.size() - 1) {
-        for (BufferedBlockMgr::Block* var_len_block: var_len_blocks_) {
-          DCHECK(var_len_block != NULL);
-          output_batch->AddBlock(var_len_block, 
RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+      // Attach the var-len pages at eos once no more rows will reference the 
pages.
+      if (fixed_len_pages_index_ == fixed_len_pages_.size() - 1) {
+        for (Page& var_len_page : var_len_pages_) {
+          DCHECK(var_len_page.is_open());
+          output_batch->AddBuffer(client, var_len_page.ExtractBuffer(client),
+              RowBatch::FlushMode::NO_FLUSH_RESOURCES);
         }
-        var_len_blocks_.clear();
+        var_len_pages_.clear();
       }
     } else {
-      // To iterate over unpinned runs, we need to exchange this block for the 
next
-      // in the next GetNext() call, so we need to hold onto the block and 
signal to
-      // the caller that the block is going to be deleted.
+      // To iterate over unpinned runs, we need to exchange this page for the 
next
+      // in the next GetNext() call, so we need to hold onto the page and 
signal to
+      // the caller that the page is going to be deleted.
       output_batch->MarkNeedsDeepCopy();
     }
-    end_of_fixed_len_block_ = true;
+    end_of_fixed_len_page_ = true;
   }
   *eos = false;
   return Status::OK();
 }
 
-Status Sorter::Run::PinNextReadBlock(vector<BufferedBlockMgr::Block*>* blocks,
-    int block_index) {
-  DCHECK_GE(block_index, 0);
-  DCHECK_LT(block_index, blocks->size() - 1);
-  BufferedBlockMgr::Block* curr_block = (*blocks)[block_index];
-  BufferedBlockMgr::Block* next_block = (*blocks)[block_index + 1];
-  DCHECK_EQ(is_pinned_, next_block->is_pinned());
+Status Sorter::Run::PinNextReadPage(vector<Page>* pages, int page_index) {
+  DCHECK_GE(page_index, 0);
+  DCHECK_LT(page_index, pages->size() - 1);
+  Page* curr_page = &(*pages)[page_index];
+  Page* next_page = &(*pages)[page_index + 1];
+  DCHECK_EQ(is_pinned_, next_page->is_pinned());
   if (is_pinned_) {
-    // The current block was attached to a batch and 'next_block' is already 
pinned.
-    DCHECK(curr_block == NULL);
+    // The current page was attached to a batch and 'next_page' is already 
pinned.
+    DCHECK(!curr_page->is_open());
     return Status::OK();
   }
-  bool pinned;
-  // Atomically delete the previous block and pin this one. Should not fail 
due to lack
-  // of memory. Pin() deletes the block even in error cases, so we need to 
remove it from
-  // the vector first to avoid an inconsistent state.
-  (*blocks)[block_index] = NULL;
-  RETURN_IF_ERROR(next_block->Pin(&pinned, curr_block, false));
-  DCHECK(pinned) << "Atomic delete and pin should not fail without error.";
+  // Close the previous page to free memory and pin the next page. Should 
always succeed
+  // since the pages are the same size.
+  curr_page->Close(sorter_->buffer_pool_client_);
+  RETURN_IF_ERROR(next_page->Pin(sorter_->buffer_pool_client_));
   return Status::OK();
 }
 
@@ -948,28 +1072,29 @@ void Sorter::Run::CollectNonNullVarSlots(Tuple* src,
   }
 }
 
-Status Sorter::Run::TryAddBlock(AddBlockMode mode,
-    vector<BufferedBlockMgr::Block*>* block_sequence, bool* added) {
-  DCHECK(!block_sequence->empty());
-  BufferedBlockMgr::Block* prev_block;
+Status Sorter::Run::TryAddPage(
+    AddPageMode mode, vector<Page>* page_sequence, bool* added) {
+  DCHECK(!page_sequence->empty());
   if (mode == KEEP_PREV_PINNED) {
-    prev_block = NULL;
+    if 
(!sorter_->buffer_pool_client_->IncreaseReservationToFit(sorter_->page_len_)) {
+      *added = false;
+      return Status::OK();
+    }
   } else {
     DCHECK(mode == UNPIN_PREV);
-    // Swap the prev block with the next, to guarantee success.
-    prev_block = block_sequence->back();
+    // Unpin the prev page to free up the memory required to pin the next page.
+    page_sequence->back().Unpin(sorter_->buffer_pool_client_);
   }
 
-  BufferedBlockMgr::Block* new_block;
-  RETURN_IF_ERROR(sorter_->block_mgr_->GetNewBlock(
-      sorter_->block_mgr_client_, prev_block, &new_block));
-  if (new_block != NULL) {
-    *added = true;
-    block_sequence->push_back(new_block);
-  } else {
-    DCHECK_EQ(mode, KEEP_PREV_PINNED);
-    *added = false;
-  }
+  RETURN_IF_ERROR(AddPage(page_sequence));
+  *added = true;
+  return Status::OK();
+}
+
+Status Sorter::Run::AddPage(vector<Page>* page_sequence) {
+  Page new_page;
+  RETURN_IF_ERROR(new_page.Init(sorter_));
+  page_sequence->push_back(move(new_page));
   return Status::OK();
 }
 
@@ -983,27 +1108,26 @@ void Sorter::Run::CopyVarLenData(const 
vector<StringValue*>& string_values,
 }
 
 void Sorter::Run::CopyVarLenDataConvertOffset(const vector<StringValue*>& 
string_values,
-    int block_index, const uint8_t* block_start, uint8_t* dest) {
-  DCHECK_GE(block_index, 0);
-  DCHECK_GE(dest - block_start, 0);
+    int page_index, const uint8_t* page_start, uint8_t* dest) {
+  DCHECK_GE(page_index, 0);
+  DCHECK_GE(dest - page_start, 0);
 
-  for (StringValue* string_val: string_values) {
+  for (StringValue* string_val : string_values) {
     memcpy(dest, string_val->ptr, string_val->len);
-    DCHECK_LE(dest - block_start, sorter_->block_mgr_->max_block_size());
-    DCHECK_LE(dest - block_start, INT_MAX);
-    int block_offset = dest - block_start;
-    uint64_t packed_offset =
-        (static_cast<uint64_t>(block_index) << 32) | block_offset;
+    DCHECK_LE(dest - page_start, sorter_->page_len_);
+    DCHECK_LE(dest - page_start, numeric_limits<uint32_t>::max());
+    uint32_t page_offset = dest - page_start;
+    uint64_t packed_offset = (static_cast<uint64_t>(page_index) << 32) | 
page_offset;
     string_val->ptr = reinterpret_cast<char*>(packed_offset);
     dest += string_val->len;
   }
 }
 
 bool Sorter::Run::ConvertOffsetsToPtrs(Tuple* tuple) {
-  // We need to be careful to handle the case where var_len_blocks_ is empty,
+  // We need to be careful to handle the case where var_len_pages_ is empty,
   // e.g. if all strings are NULL.
-  uint8_t* block_start = var_len_blocks_.empty() ? NULL :
-      var_len_blocks_[var_len_blocks_index_]->buffer();
+  uint8_t* page_start =
+      var_len_pages_.empty() ? NULL : 
var_len_pages_[var_len_pages_index_].data();
 
   const vector<SlotDescriptor*>& string_slots = 
sort_tuple_desc_->string_slots();
   int num_non_null_string_slots = 0;
@@ -1015,47 +1139,47 @@ bool Sorter::Run::ConvertOffsetsToPtrs(Tuple* tuple) {
     DCHECK(slot_desc->type().IsVarLenStringType());
     StringValue* value = reinterpret_cast<StringValue*>(
         tuple->GetSlot(slot_desc->tuple_offset()));
-    // packed_offset includes the block index in the upper 32 bits and the 
block
+    // packed_offset includes the page index in the upper 32 bits and the page
     // offset in the lower 32 bits. See CopyVarLenDataConvertOffset().
     uint64_t packed_offset = reinterpret_cast<uint64_t>(value->ptr);
-    int block_index = packed_offset >> 32;
-    int block_offset = packed_offset & 0xFFFFFFFF;
+    uint32_t page_index = packed_offset >> 32;
+    uint32_t page_offset = packed_offset & 0xFFFFFFFF;
 
-    if (block_index > var_len_blocks_index_) {
-      // We've reached the block boundary for the current var-len block.
+    if (page_index > var_len_pages_index_) {
+      // We've reached the page boundary for the current var-len page.
       // This tuple will be returned in the next call to GetNext().
-      DCHECK_GE(block_index, 0);
-      DCHECK_LE(block_index, var_len_blocks_.size());
-      DCHECK_EQ(block_index, var_len_blocks_index_ + 1);
-      DCHECK_EQ(block_offset, 0); // The data is the first thing in the next 
block.
+      DCHECK_GE(page_index, 0);
+      DCHECK_LE(page_index, var_len_pages_.size());
+      DCHECK_EQ(page_index, var_len_pages_index_ + 1);
+      DCHECK_EQ(page_offset, 0); // The data is the first thing in the next 
page.
       // This must be the first slot with var len data for the tuple. Var len 
data
       // for tuple shouldn't be split across blocks.
       DCHECK_EQ(num_non_null_string_slots, 1);
       return false;
     }
 
-    DCHECK_EQ(block_index, var_len_blocks_index_);
-    if (var_len_blocks_.empty()) {
+    DCHECK_EQ(page_index, var_len_pages_index_);
+    if (var_len_pages_.empty()) {
       DCHECK_EQ(value->len, 0);
     } else {
-      DCHECK_LE(block_offset + value->len, 
var_len_blocks_[block_index]->valid_data_len());
+      DCHECK_LE(page_offset + value->len, 
var_len_pages_[page_index].valid_data_len());
     }
     // Calculate the address implied by the offset and assign it. May be NULL 
for
-    // zero-length strings if there are no blocks in the run since block_start 
is NULL.
-    DCHECK(block_start != NULL || block_offset == 0);
-    value->ptr = reinterpret_cast<char*>(block_start + block_offset);
+    // zero-length strings if there are no pages in the run since page_start 
is NULL.
+    DCHECK(page_start != NULL || page_offset == 0);
+    value->ptr = reinterpret_cast<char*>(page_start + page_offset);
   }
   return true;
 }
 
 int64_t Sorter::Run::TotalBytes() const {
   int64_t total_bytes = 0;
-  for (BufferedBlockMgr::Block* block: fixed_len_blocks_) {
-    if (block != NULL) total_bytes += block->valid_data_len();
+  for (const Page& page : fixed_len_pages_) {
+    if (page.is_open()) total_bytes += page.valid_data_len();
   }
 
-  for (BufferedBlockMgr::Block* block: var_len_blocks_) {
-    if (block != NULL) total_bytes += block->valid_data_len();
+  for (const Page& page : var_len_pages_) {
+    if (page.is_open()) total_bytes += page.valid_data_len();
   }
   return total_bytes;
 }
@@ -1072,61 +1196,61 @@ Sorter::TupleIterator::TupleIterator(Sorter::Run* run, 
int64_t index)
   }
 
   const int tuple_size = run->sort_tuple_size_;
-  int block_offset;
+  uint32_t page_offset;
   if (UNLIKELY(index == run->num_tuples())) {
     // If the iterator is initialized past the end, set up buffer_start_index_,
-    // 'buffer_end_index_' and 'block_index_' for the last block, then set 
'tuple' to
+    // 'buffer_end_index_' and 'page_index_' for the last page, then set 
'tuple' to
     // 'tuple_size' bytes past the last tuple, so everything is correct when 
Prev() is
     // invoked.
-    block_index_ = run->fixed_len_blocks_.size() - 1;
-    block_offset = ((index - 1) % run->block_capacity_) * tuple_size + 
tuple_size;
+    page_index_ = run->fixed_len_pages_.size() - 1;
+    page_offset = ((index - 1) % run->page_capacity_) * tuple_size + 
tuple_size;
   } else {
-    block_index_ = index / run->block_capacity_;
-    block_offset = (index % run->block_capacity_) * tuple_size;
+    page_index_ = index / run->page_capacity_;
+    page_offset = (index % run->page_capacity_) * tuple_size;
   }
-  buffer_start_index_ = block_index_ * run->block_capacity_;
-  buffer_end_index_ = buffer_start_index_ + run->block_capacity_;
-  tuple_ = run->fixed_len_blocks_[block_index_]->buffer() + block_offset;
+  buffer_start_index_ = page_index_ * run->page_capacity_;
+  buffer_end_index_ = buffer_start_index_ + run->page_capacity_;
+  tuple_ = run->fixed_len_pages_[page_index_].data() + page_offset;
 }
 
 void Sorter::TupleIterator::Next(Sorter::Run* run, int tuple_size) {
   DCHECK_LT(index_, run->num_tuples()) << "Can only advance one past end of 
run";
   tuple_ += tuple_size;
   ++index_;
-  if (UNLIKELY(index_ >= buffer_end_index_)) NextBlock(run, tuple_size);
+  if (UNLIKELY(index_ >= buffer_end_index_)) NextPage(run, tuple_size);
 }
 
-void Sorter::TupleIterator::NextBlock(Sorter::Run* run, int tuple_size) {
-  // When moving after the last tuple, stay at the last block.
+void Sorter::TupleIterator::NextPage(Sorter::Run* run, int tuple_size) {
+  // When moving after the last tuple, stay at the last page.
   if (index_ >= run->num_tuples()) return;
-  ++block_index_;
-  DCHECK_LT(block_index_, run->fixed_len_blocks_.size());
-  buffer_start_index_ = block_index_ * run->block_capacity_;
+  ++page_index_;
+  DCHECK_LT(page_index_, run->fixed_len_pages_.size());
+  buffer_start_index_ = page_index_ * run->page_capacity_;
   DCHECK_EQ(index_, buffer_start_index_);
-  buffer_end_index_ = buffer_start_index_ + run->block_capacity_;
-  tuple_ = run->fixed_len_blocks_[block_index_]->buffer();
+  buffer_end_index_ = buffer_start_index_ + run->page_capacity_;
+  tuple_ = run->fixed_len_pages_[page_index_].data();
 }
 
 void Sorter::TupleIterator::Prev(Sorter::Run* run, int tuple_size) {
   DCHECK_GE(index_, 0) << "Can only advance one before start of run";
   tuple_ -= tuple_size;
   --index_;
-  if (UNLIKELY(index_ < buffer_start_index_)) PrevBlock(run, tuple_size);
+  if (UNLIKELY(index_ < buffer_start_index_)) PrevPage(run, tuple_size);
 }
 
-void Sorter::TupleIterator::PrevBlock(Sorter::Run* run, int tuple_size) {
-  // When moving before the first tuple, stay at the first block.
+void Sorter::TupleIterator::PrevPage(Sorter::Run* run, int tuple_size) {
+  // When moving before the first tuple, stay at the first page.
   if (index_ < 0) return;
-  --block_index_;
-  DCHECK_GE(block_index_, 0);
-  buffer_start_index_ = block_index_ * run->block_capacity_;
-  buffer_end_index_ = buffer_start_index_ + run->block_capacity_;
+  --page_index_;
+  DCHECK_GE(page_index_, 0);
+  buffer_start_index_ = page_index_ * run->page_capacity_;
+  buffer_end_index_ = buffer_start_index_ + run->page_capacity_;
   DCHECK_EQ(index_, buffer_end_index_ - 1);
-  int last_tuple_block_offset = run->sort_tuple_size_ * (run->block_capacity_ 
- 1);
-  tuple_ = run->fixed_len_blocks_[block_index_]->buffer() + 
last_tuple_block_offset;
+  int last_tuple_page_offset = run->sort_tuple_size_ * (run->page_capacity_ - 
1);
+  tuple_ = run->fixed_len_pages_[page_index_].data() + last_tuple_page_offset;
 }
 
-Sorter::TupleSorter::TupleSorter(const TupleRowComparator& comp, int64_t 
block_size,
+Sorter::TupleSorter::TupleSorter(const TupleRowComparator& comp, int64_t 
page_size,
     int tuple_size, RuntimeState* state)
   : tuple_size_(tuple_size),
     comparator_(comp),
@@ -1340,13 +1464,15 @@ inline void Sorter::TupleSorter::Swap(Tuple* left, 
Tuple* right, Tuple* swap_tup
 
 Sorter::Sorter(const TupleRowComparator& compare_less_than,
     const vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* 
output_row_desc,
-    MemTracker* mem_tracker, RuntimeProfile* profile, RuntimeState* state,
+    MemTracker* mem_tracker, BufferPool::ClientHandle* buffer_pool_client,
+    int64_t page_len, RuntimeProfile* profile, RuntimeState* state, int 
node_id,
     bool enable_spilling)
-  : state_(state),
+  : node_id_(node_id),
+    state_(state),
     compare_less_than_(compare_less_than),
     in_mem_tuple_sorter_(NULL),
-    block_mgr_(state->block_mgr()),
-    block_mgr_client_(NULL),
+    buffer_pool_client_(buffer_pool_client),
+    page_len_(page_len),
     has_var_len_slots_(false),
     sort_tuple_exprs_(sort_tuple_exprs),
     mem_tracker_(mem_tracker),
@@ -1370,10 +1496,24 @@ Sorter::~Sorter() {
 
 Status Sorter::Prepare(ObjectPool* obj_pool, MemPool* expr_mem_pool) {
   DCHECK(in_mem_tuple_sorter_ == NULL) << "Already prepared";
+  // Page byte offsets are packed into uint32_t values, which limits the 
supported
+  // page size.
+  if (page_len_ > numeric_limits<uint32_t>::max()) {
+    return Status(Substitute(
+          "Page size $0 exceeded maximum supported in sorter ($1)",
+          PrettyPrinter::PrintBytes(page_len_),
+          PrettyPrinter::PrintBytes(numeric_limits<uint32_t>::max())));
+  }
+
   TupleDescriptor* sort_tuple_desc = output_row_desc_->tuple_descriptors()[0];
+  if (sort_tuple_desc->byte_size() > page_len_) {
+    return Status(TErrorCode::MAX_ROW_SIZE,
+        PrettyPrinter::Print(sort_tuple_desc->byte_size(), TUnit::BYTES), 
node_id_,
+        PrettyPrinter::Print(0, TUnit::BYTES));
+  }
   has_var_len_slots_ = sort_tuple_desc->HasVarlenSlots();
-  in_mem_tuple_sorter_.reset(new TupleSorter(compare_less_than_,
-      block_mgr_->max_block_size(), sort_tuple_desc->byte_size(), state_));
+  in_mem_tuple_sorter_.reset(new TupleSorter(compare_less_than_, page_len_,
+      sort_tuple_desc->byte_size(), state_));
 
   initial_runs_counter_ = ADD_COUNTER(profile_, "InitialRunsCreated", 
TUnit::UNIT);
   spilled_runs_counter_ = ADD_COUNTER(profile_, "SpilledRuns", TUnit::UNIT);
@@ -1382,17 +1522,6 @@ Status Sorter::Prepare(ObjectPool* obj_pool, MemPool* 
expr_mem_pool) {
   sorted_data_size_ = ADD_COUNTER(profile_, "SortDataSize", TUnit::BYTES);
   run_sizes_ = ADD_SUMMARY_STATS_COUNTER(profile_, "NumRowsPerRun", 
TUnit::UNIT);
 
-  // If spilling is enabled, we need enough buffers to perform merges. 
Otherwise, there
-  // won't be any merges and we only need 1 buffer.
-  // Must be kept in sync with SortNode.computeResourceProfile() in fe.
-  int min_buffers_required = enable_spilling_ ? MIN_BUFFERS_PER_MERGE : 1;
-  // Fixed and var-length blocks are separate, so we need twice as many blocks 
for both if
-  // there is var-length data.
-  if (sort_tuple_desc->HasVarlenSlots()) min_buffers_required *= 2;
-
-  RETURN_IF_ERROR(block_mgr_->RegisterClient(Substitute("Sorter ptr=$0", this),
-      min_buffers_required, false, mem_tracker_, state_, &block_mgr_client_));
-
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(sort_tuple_exprs_, state_, 
obj_pool,
       expr_mem_pool, &sort_tuple_expr_evals_));
   return Status::OK();
@@ -1413,6 +1542,15 @@ void Sorter::FreeLocalAllocations() {
   ScalarExprEvaluator::FreeLocalAllocations(sort_tuple_expr_evals_);
 }
 
+int64_t Sorter::ComputeMinReservation() {
+  // Must be kept in sync with SortNode.computeNodeResourceProfile() in fe.
+  int min_buffers_required = enable_spilling_ ? MIN_BUFFERS_PER_MERGE : 1;
+  // Fixed and var-length pages are separate, so we need double the pages
+  // if there is var-length data.
+  if (output_row_desc_->HasVarlenSlots()) min_buffers_required *= 2;
+  return min_buffers_required * page_len_;
+}
+
 Status Sorter::AddBatch(RowBatch* batch) {
   DCHECK(unsorted_run_ != NULL);
   DCHECK(batch != NULL);
@@ -1424,11 +1562,12 @@ Status Sorter::AddBatch(RowBatch* batch) {
 
     cur_batch_index += num_processed;
     if (cur_batch_index < batch->num_rows()) {
-      // The current run is full. Sort it and begin the next one.
+      // The current run is full. Sort it, spill it and begin the next one.
+      RETURN_IF_ERROR(state_->StartSpilling(mem_tracker_));
       RETURN_IF_ERROR(SortCurrentInputRun());
-      RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllBlocks());
-      unsorted_run_ = obj_pool_.Add(
-          new Run(this, output_row_desc_->tuple_descriptors()[0], true));
+      RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
+      unsorted_run_ =
+          obj_pool_.Add(new Run(this, 
output_row_desc_->tuple_descriptors()[0], true));
       RETURN_IF_ERROR(unsorted_run_->Init());
     }
   }
@@ -1459,7 +1598,7 @@ Status Sorter::InputDone() {
   // Unpin the final run to free up memory for the merge.
   // TODO: we could keep it in memory in some circumstances as an 
optimisation, once
   // we have a buffer pool with more reliable reservations (IMPALA-3200).
-  RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllBlocks());
+  RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
 
   // Merge intermediate runs until we have a final merge set-up.
   // TODO: Attempt to allocate more memory before doing intermediate merges. 
This may
@@ -1487,7 +1626,6 @@ void Sorter::Reset() {
 
 void Sorter::Close(RuntimeState* state) {
   CleanupAllRuns();
-  block_mgr_->ClearReservations(block_mgr_client_);
   obj_pool_.Clear();
   ScalarExprEvaluator::Close(sort_tuple_expr_evals_, state);
 }
@@ -1495,9 +1633,9 @@ void Sorter::Close(RuntimeState* state) {
 void Sorter::CleanupAllRuns() {
   Run::CleanupRuns(&sorted_runs_);
   Run::CleanupRuns(&merging_runs_);
-  if (unsorted_run_ != NULL) unsorted_run_->DeleteAllBlocks();
+  if (unsorted_run_ != NULL) unsorted_run_->CloseAllPages();
   unsorted_run_ = NULL;
-  if (merge_output_run_ != NULL) merge_output_run_->DeleteAllBlocks();
+  if (merge_output_run_ != NULL) merge_output_run_->CloseAllPages();
   merge_output_run_ = NULL;
 }
 
@@ -1519,10 +1657,10 @@ Status Sorter::SortCurrentInputRun() {
 
 Status Sorter::MergeIntermediateRuns() {
   DCHECK_GE(sorted_runs_.size(), 2);
-  int pinned_blocks_per_run = has_var_len_slots_ ? 2 : 1;
-  int max_runs_per_final_merge = MAX_BUFFERS_PER_MERGE / pinned_blocks_per_run;
+  int pinned_pages_per_run = has_var_len_slots_ ? 2 : 1;
+  int max_runs_per_final_merge = MAX_BUFFERS_PER_MERGE / pinned_pages_per_run;
 
-  // During an intermediate merge, the one or two blocks from the output 
sorted run
+  // During an intermediate merge, the one or two pages from the output sorted 
run
   // that are being written must be pinned.
   int max_runs_per_intermediate_merge = max_runs_per_final_merge - 1;
   DCHECK_GT(max_runs_per_intermediate_merge, 1);
@@ -1549,7 +1687,7 @@ Status Sorter::MergeIntermediateRuns() {
     if (sorted_runs_.empty()) {
       // Don't need intermediate run for final merge.
       if (merge_output_run_ != NULL) {
-        merge_output_run_->DeleteAllBlocks();
+        merge_output_run_->CloseAllPages();
         merge_output_run_ = NULL;
       }
       return Status::OK();
@@ -1604,7 +1742,8 @@ Status Sorter::CreateMerger(int max_num_runs) {
 }
 
 Status Sorter::ExecuteIntermediateMerge(Sorter::Run* merged_run) {
-  RowBatch intermediate_merge_batch(output_row_desc_, state_->batch_size(), 
mem_tracker_);
+  RowBatch intermediate_merge_batch(
+      output_row_desc_, state_->batch_size(), mem_tracker_);
   bool eos = false;
   while (!eos) {
     // Copy rows into the new run until done.
@@ -1621,5 +1760,4 @@ Status Sorter::ExecuteIntermediateMerge(Sorter::Run* 
merged_run) {
   RETURN_IF_ERROR(merged_run->FinalizeInput());
   return Status::OK();
 }
-
 } // namespace impala

Reply via email to