http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/runtime/sorted-run-merger.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.h 
b/be/src/runtime/sorted-run-merger.h
index d622fd9..6ffc544 100644
--- a/be/src/runtime/sorted-run-merger.h
+++ b/be/src/runtime/sorted-run-merger.h
@@ -29,7 +29,7 @@ class RowDescriptor;
 class RuntimeProfile;
 
 /// SortedRunMerger is used to merge multiple sorted runs of tuples. A run is 
a sorted
-/// sequence of row batches, which are fetched from a RunBatchSupplier 
function object.
+/// sequence of row batches, which are fetched from a RunBatchSupplierFn 
function object.
 /// Merging is implemented using a binary min-heap that maintains the run with 
the next
 /// tuple in sorted order at the top of the heap.
 ///
@@ -39,12 +39,18 @@ class RuntimeProfile;
 /// If false, GetNext() only copies tuple pointers (TupleRows) into the output 
batch,
 /// and transfers resource ownership from the input batches to the output 
batch when
 /// an input batch is processed.
+///
+/// SortedRunMerger cannot handle the 'need_to_return' resource-transfer model 
so
+/// if the RunBatchSupplierFn can return batches with the 'need_to_return' 
flag set,
+/// the merger must have 'deep_copy_input'. TODO: once 'need_to_return' is 
deprecated,
+/// this is no longer a problem.
 class SortedRunMerger {
  public:
   /// Function that returns the next batch of rows from an input sorted run. 
The batch
-  /// is owned by the supplier (i.e. not SortedRunMerger). eos is indicated by 
an NULL
-  /// batch being returned.
-  typedef boost::function<Status (RowBatch**)> RunBatchSupplier;
+  /// is owned by the supplier (i.e. not SortedRunMerger). eos is indicated by 
a NULL
+  /// batch being returned. The returned batch can have any number of rows 
(including
+  /// zero).
+  typedef boost::function<Status (RowBatch**)> RunBatchSupplierFn;
 
   SortedRunMerger(const TupleRowComparator& comparator, RowDescriptor* 
row_desc,
       RuntimeProfile* profile, bool deep_copy_input);
@@ -52,17 +58,22 @@ class SortedRunMerger {
   /// Prepare this merger to merge and return rows from the sorted runs in 
'input_runs'.
   /// Retrieves the first batch from each run and sets up the binary heap 
implementing
   /// the priority queue.
-  Status Prepare(const std::vector<RunBatchSupplier>& input_runs);
+  Status Prepare(const std::vector<RunBatchSupplierFn>& input_runs);
 
   /// Return the next batch of sorted rows from this merger.
   Status GetNext(RowBatch* output_batch, bool* eos);
 
-  /// Called to finalize a merge when deep_copy is false. Transfers resources 
from
-  /// all input batches to the specified output batch.
-  void TransferAllResources(RowBatch* transfer_resource_batch);
-
  private:
-  class BatchedRowSupplier;
+  class SortedRunWrapper;
+
+  /// Remove the current row from the current min RunBatchSupplierFn and try 
to advance to
+  /// the next row. If 'deep_copy_input_' is false, 'transfer_batch' must be 
supplied to
+  /// attach resources to.
+  ///
+  /// When AdvanceMinRow returns, the previous min is advanced to the next row 
and the
+  /// heap is reordered accordingly. The RunBatchSupplierFn is removed from 
the heap if
+  /// this was its last row. Any completed resources are transferred to the 
batch.
+  Status AdvanceMinRow(RowBatch* transfer_batch);
 
   /// Assuming the element at parent_index is the only out of place element in 
the heap,
   /// restore the heap property (i.e. swap elements so parent <= children).
@@ -73,9 +84,9 @@ class SortedRunMerger {
   /// and the children of the element at index i are 2*i+1 and 2*i+2. The heap 
property is
   /// that row of the parent element is <= the rows of the child elements 
according to the
   /// comparator comparator_.
-  /// The BatchedRowSupplier objects used in the min_heap_ are owned by this
+  /// The SortedRunWrapper objects used in the min_heap_ are owned by this
   /// SortedRunMerger instance.
-  std::vector<BatchedRowSupplier*> min_heap_;
+  std::vector<SortedRunWrapper*> min_heap_;
 
   /// Row comparator. Returns true if lhs < rhs.
   TupleRowComparator comparator_;
@@ -87,7 +98,7 @@ class SortedRunMerger {
   /// True if rows must be deep copied into the output batch.
   bool deep_copy_input_;
 
-  /// Pool of BatchedRowSupplier instances.
+  /// Pool of SortedRunWrapper instances.
   ObjectPool pool_;
 
   /// Times calls to GetNext().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index 299dd2f..c242b6b 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -45,105 +45,193 @@ const string MEM_ALLOC_FAILED_ERROR_MSG = "Failed to 
allocate block for $0-lengt
     "data needed for sorting. Reducing query concurrency or increasing the "
     "memory limit may help this query to complete successfully.";
 
-/// A run is a sequence of blocks containing tuples that are or will 
eventually be in
-/// sorted order.
-/// A run may maintain two sequences of blocks - one containing the tuples 
themselves,
-/// (i.e. fixed-len slots and ptrs to var-len data), and the other for the 
var-length
-/// column data pointed to by those tuples.
-/// Tuples in a run may be sorted in place (in-memory) and merged using a 
merger.
+/// Delete all non-null blocks in 'blocks' and clear vector.
+static void DeleteAndClearBlocks(vector<BufferedBlockMgr::Block*>* blocks) {
+  for (BufferedBlockMgr::Block* block: *blocks) {
+    if (block != NULL) block->Delete();
+  }
+  blocks->clear();
+}
+
+static int NumNonNullBlocks(const vector<BufferedBlockMgr::Block*>& blocks) {
+  int count = 0;
+  for (BufferedBlockMgr::Block* block: blocks) {
+    if (block != NULL) ++count;
+  }
+  return count;
+}
+
+/// A run is a sequence of tuples. The run can be sorted or unsorted (in which 
case the
+/// Sorter will sort it). A run comprises a sequence of fixed-length blocks 
containing the
+/// tuples themselves (i.e. fixed-len slots that may contain ptrs to 
var-length data), and
+/// an optional sequence of var-length blocks containing the var-length data.
+///
+/// Runs are either "initial runs" constructed from the sorter's input by 
evaluating
+/// the expressions in 'sort_tuple_slot_expr_ctxs_' or "intermediate runs" 
constructed
+/// by merging already-sorted runs. Initial runs are sorted in-place in 
memory. Once
+/// sorted, runs can be spilled to disk to free up memory. Sorted runs are 
merged by
+/// SortedRunMerger, either to produce the final sorted output or to produce 
another
+/// sorted run.
+///
+/// The expected calling sequence of functions is as follows:
+/// * Init() to initialize the run and allocate initial blocks.
+/// * Add*Batch() to add batches of tuples to the run.
+/// * FinalizeInput() to signal that no more batches will be added.
+/// * If the run is unsorted, it must be sorted. After that set_sorted() must 
be called.
+/// * Once sorted, the run is ready to read in sorted order for merging or 
final output.
+/// * PrepareRead() to allocate resources for reading the run.
+/// * GetNext() (if there was a single run) or GetNextBatch() (when merging 
multiple runs)
+///   to read from the run.
+/// * Once reading is done, DeleteAllBlocks() should be called to free 
resources.
 class Sorter::Run {
  public:
-  /// materialize_slots is true for runs constructed from input rows. The 
input rows are
-  /// materialized into single sort tuples using the expressions in
-  /// sort_tuple_slot_expr_ctxs_. For intermediate merges, the tuples are 
already
-  /// materialized so materialize_slots is false.
-  Run(Sorter* parent, TupleDescriptor* sort_tuple_desc, bool 
materialize_slots);
+  Run(Sorter* parent, TupleDescriptor* sort_tuple_desc, bool initial_run);
 
-  ~Run() { DeleteAllBlocks(); }
+  ~Run() {
+    DCHECK(fixed_len_blocks_.empty());
+    DCHECK(var_len_blocks_.empty());
+    DCHECK(var_len_copy_block_ == NULL);
+  }
 
   /// Initialize the run for input rows by allocating the minimum number of 
required
   /// blocks - one block for fixed-len data added to fixed_len_blocks_, one 
for the
   /// initially unsorted var-len data added to var_len_blocks_, and one to 
copy sorted
-  /// var-len data into (var_len_copy_block_).
+  /// var-len data into var_len_copy_block_.
   Status Init();
 
-  /// Add a batch of input rows to the current run. Returns the number
-  /// of rows actually added in num_processed. If the run is full (no more 
blocks can
-  /// be allocated), num_processed may be less than the number of rows in the 
batch.
-  /// If materialize_slots_ is true, materializes the input rows using the 
expressions
-  /// in sorter_->sort_tuple_slot_expr_ctxs_, else just copies the input rows.
-  template <bool has_var_len_data>
-  Status AddBatch(RowBatch* batch, int start_index, int* num_processed);
+  /// Add the rows from 'batch' starting at 'start_index' to the current run. 
Returns the
+  /// number of rows actually added in 'num_processed'. If the run is full (no 
more blocks
+  /// can be allocated), 'num_processed' may be less than the number of 
remaining rows in
+  /// the batch. AddInputBatch() materializes the input rows using the 
expressions in
+  /// sorter_->sort_tuple_slot_expr_ctxs_, while AddIntermediateBatch() just 
copies rows.
+  Status AddInputBatch(RowBatch* batch, int start_index, int* num_processed) {
+    DCHECK(initial_run_);
+    if (has_var_len_slots_) {
+      return AddBatchInternal<true, true>(batch, start_index, num_processed);
+    } else {
+      return AddBatchInternal<false, true>(batch, start_index, num_processed);
+    }
+  }
+  Status AddIntermediateBatch(RowBatch* batch, int start_index, int* 
num_processed) {
+    DCHECK(!initial_run_);
+    if (has_var_len_slots_) {
+      return AddBatchInternal<true, false>(batch, start_index, num_processed);
+    } else {
+      return AddBatchInternal<false, false>(batch, start_index, num_processed);
+    }
+  }
 
-  /// Attaches all fixed-len and var-len blocks to the given row batch.
-  void TransferResources(RowBatch* row_batch);
+  /// Called after the final call to Add*Batch() to do any bookkeeping 
necessary to
+  /// finalize the run. Must be called before sorting or merging the run.
+  Status FinalizeInput();
 
   /// Unpins all the blocks in a sorted run. Var-length column data is copied 
into new
   /// blocks in sorted order. Pointers in the original tuples are converted to 
offsets
-  /// from the beginning of the sequence of var-len data blocks.
+  /// from the beginning of the sequence of var-len data blocks. Returns an 
error and
+  /// may leave some blocks pinned if an error is encountered in the block mgr.
   Status UnpinAllBlocks();
 
   /// Deletes all blocks.
   void DeleteAllBlocks();
 
-  /// Interface for merger - get the next batch of rows from this run. The 
callee (Run)
-  /// still owns the returned batch. Calls GetNext(RowBatch*, bool*).
-  Status GetNextBatch(RowBatch** sorted_batch);
+  /// Prepare to read a sorted run. Pins the first block(s) in the run if the 
run was
+  /// previously unpinned.
+  Status PrepareRead();
 
- private:
-  friend class Sorter;
-  friend class TupleSorter;
+  /// Interface for merger - get the next batch of rows from this run. This 
run still
+  /// owns the returned batch. Calls GetNext(RowBatch*, bool*).
+  Status GetNextBatch(RowBatch** sorted_batch);
 
-  /// Fill output_batch with rows from this run. If convert_offset_to_ptr is 
true, offsets
+  /// Fill output_batch with rows from this run. If CONVERT_OFFSET_TO_PTR is 
true, offsets
   /// in var-length slots are converted back to pointers. Only row pointers 
are copied
-  /// into output_batch.
-  /// If this run was unpinned, one block (2 if there are var-len slots) is 
pinned while
+  /// into output_batch. eos is set to true after all rows from the run are 
returned.
+  /// If eos is true, the returned output_batch has zero rows and has no 
attached blocks.
+  /// If this run was unpinned, one block (two if there are var-len slots) is 
pinned while
   /// rows are filled into output_batch. The block is unpinned before the next 
block is
-  /// pinned. Atmost 1 (2) block(s) will be pinned at any time.
-  /// If the run was pinned, the blocks are not unpinned (Sorter holds on to 
the memory).
-  /// In either case, all rows in output_batch will have their fixed and 
var-len data from
-  /// the same block.
-  /// TODO: If we leave the last run to be merged in memory, the fixed-len 
blocks can be
-  /// unpinned as they are consumed.
-  template <bool convert_offset_to_ptr>
+  /// pinned, so at most one (two if there are var-len slots) block(s) will be 
pinned at
+  /// once. If the run was pinned, the blocks are not unpinned and each block 
is attached
+  /// to 'output_batch' once all rows referencing data in the block have been 
returned,
+  /// either in the current batch or previous batches. In both pinned and 
unpinned cases,
+  /// all rows in output_batch will reference at most one fixed-len and one 
var-len block.
+  template <bool CONVERT_OFFSET_TO_PTR>
   Status GetNext(RowBatch* output_batch, bool* eos);
 
-  /// Check if a run can be extended by allocating additional blocks from the 
block
-  /// manager. Always true when building a sorted run in an intermediate 
merge, because
-  /// the current block(s) can be unpinned before getting the next free block 
(so a block
-  /// is always available)
-  bool CanExtendRun() const;
+  /// Delete all blocks in 'runs' and clear 'runs'.
+  static void CleanupRuns(deque<Run*>* runs) {
+    for (Run* run: *runs) {
+      run->DeleteAllBlocks();
+    }
+    runs->clear();
+  }
+
+  /// Return total amount of fixed and var len data in run, not including 
blocks that
+  /// were already transferred.
+  int64_t TotalBytes() const;
 
-  /// Collect the non-null var-len (e.g. STRING) slots from 'src' in var_slots 
and return
-  /// the total length of all var_len slots in total_var_len.
-  void CollectNonNullVarSlots(Tuple* src, vector<StringValue*>* var_len_values,
-      int* total_var_len);
+  inline bool is_pinned() const { return is_pinned_; }
+  inline bool is_finalized() const { return is_finalized_; }
+  inline bool is_sorted() const { return is_sorted_; }
+  inline void set_sorted() { is_sorted_ = true; }
+  inline int64_t num_tuples() const { return num_tuples_; }
+
+ private:
+  /// TupleIterator needs access to internals to iterate over tuples.
+  friend class TupleIterator;
 
-  /// Check if the current run can be extended by a block. Add the newly 
allocated block
-  /// to block_sequence, or set added to false if the run could not be 
extended.
-  /// If the run is sorted (produced by an intermediate merge), unpin the last 
block in
-  /// block_sequence before allocating and adding a new block - the run can 
always be
-  /// extended in this case. If the run is unsorted, check 
max_blocks_in_unsorted_run_
-  /// to see if a block can be added to the run. Also updates the sort bytes 
counter.
-  Status TryAddBlock(vector<BufferedBlockMgr::Block*>* block_sequence, bool* 
added);
+  /// Templatized implementation of Add*Batch() functions.
+  /// INITIAL_RUN and HAS_VAR_LEN_SLOTS are template arguments for performance 
and must
+  /// match 'initial_run_' and 'has_var_len_slots_'.
+  template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN>
+  Status AddBatchInternal(RowBatch* batch, int start_index, int* 
num_processed);
 
-  /// Prepare to read a sorted run. Pins the first block(s) in the run if the 
run was
-  /// previously unpinned.
-  Status PrepareRead();
+  /// Finalize the list of blocks: delete empty final blocks and unpin the 
previous block
+  /// if the run is unpinned.
+  Status FinalizeBlocks(vector<BufferedBlockMgr::Block*>* blocks);
 
-  /// Copy the StringValue data in var_values to dest in order and update the 
StringValue
-  /// ptrs to point to the copied data.
+  /// Collect the non-null var-len (e.g. STRING) slots from 'src' in 
'var_len_values' and
+  /// return the total length of all var-len values in 'total_var_len'.
+  void CollectNonNullVarSlots(Tuple* src, vector<StringValue*>* var_len_values,
+      int* total_var_len);
+
+  enum AddBlockMode { KEEP_PREV_PINNED, UNPIN_PREV };
+
+  /// Try to extend the current run by a block. If 'mode' is KEEP_PREV_PINNED, 
try to
+  /// allocate a new block, which may fail to extend the run due to lack of 
memory. If
+  /// mode is 'UNPIN_PREV', unpin the previous block in block_sequence before 
allocating
+  /// and adding a new block - this never fails due to lack of memory.
+  ///
+  /// Returns an error status only if the block manager returns an error. If 
no error is
+  /// encountered, sets 'added' to indicate whether the run was extended and 
returns
+  /// Status::OK(). The new block is appended to 'block_sequence'.
+  Status TryAddBlock(AddBlockMode mode, vector<BufferedBlockMgr::Block*>* 
block_sequence,
+      bool* added);
+
+  /// Advance to the next read block. If the run is pinned, has no effect. If 
the run
+  /// is unpinned, atomically pin the block at 'block_index' + 1 in 'blocks' 
and delete
+  /// the block at 'block_index'.
+  Status PinNextReadBlock(vector<BufferedBlockMgr::Block*>* blocks, int 
block_index);
+
+  /// Copy the StringValues in 'var_values' to 'dest' in order and update the 
StringValue
+  /// ptrs in 'dest' to point to the copied data.
   void CopyVarLenData(const vector<StringValue*>& var_values, uint8_t* dest);
 
-  /// Copy the StringValue in var_values to dest in order. Update the 
StringValue ptrs to
-  /// contain a packed offset for the copied data comprising block_index and 
the offset
-  /// relative to block_start.
+  /// Copy the StringValues in 'var_values' to 'dest' in order. Update the 
StringValue
+  /// ptrs in 'dest' to contain a packed offset for the copied data comprising
+  /// block_index and the offset relative to block_start.
   void CopyVarLenDataConvertOffset(const vector<StringValue*>& var_values,
       int block_index, const uint8_t* block_start, uint8_t* dest);
 
-  /// Returns true if we have var-len slots and there are var-len blocks.
+  /// Convert encoded offsets to valid pointers in tuple with layout 
'sort_tuple_desc_'.
+  /// 'tuple' is modified in-place. Returns true if the pointers refer to the 
block at
+  /// 'var_len_blocks_index_' and were successfully converted or false if the 
var len
+  /// data is in the next block, in which case 'tuple' is unmodified.
+  bool ConvertOffsetsToPtrs(Tuple* tuple);
+
+  /// Returns true if we have var-len blocks in the run.
   inline bool HasVarLenBlocks() const {
-    return has_var_len_slots_ && !var_len_blocks_.empty();
+    // Shouldn't have any blocks unless there are slots.
+    DCHECK(var_len_blocks_.empty() || has_var_len_slots_);
+    return !var_len_blocks_.empty();
   }
 
   /// Parent sorter object.
@@ -153,67 +241,146 @@ class Sorter::Run {
   /// sort_tuple_desc_) before sorting.
   const TupleDescriptor* sort_tuple_desc_;
 
-  /// Sizes of sort tuple and block.
+  /// The size in bytes of the sort tuple.
   const int sort_tuple_size_;
 
-  const bool has_var_len_slots_;
+  /// Number of tuples per block in a run.
+  const int block_capacity_;
 
-  /// True if the sort tuple must be materialized from the input batch in 
AddBatch().
-  /// materialize_slots_ is true for runs being constructed from input 
batches, and
-  /// is false for runs being constructed from intermediate merges.
-  const bool materialize_slots_;
+  const bool has_var_len_slots_;
 
-  /// True if the run is sorted. Set to true after an in-memory sort, and 
initialized to
-  /// true for runs resulting from merges.
-  bool is_sorted_;
+  /// True if this is an initial run. False implies this is an sorted 
intermediate run
+  /// resulting from merging other runs.
+  const bool initial_run_;
 
-  /// True if all blocks in the run are pinned.
+  /// True if all blocks in the run are pinned. Initial runs start off pinned 
and
+  /// can be unpinned. Intermediate runs are always unpinned.
   bool is_pinned_;
 
+  /// True after FinalizeInput() is called. No more tuples can be added after 
the
+  /// run is finalized.
+  bool is_finalized_;
+
+  /// True if the tuples in the run are currently in sorted order.
+  /// Always true for intermediate runs.
+  bool is_sorted_;
+
   /// Sequence of blocks in this run containing the fixed-length portion of 
the sort
   /// tuples comprising this run. The data pointed to by the var-len slots are 
in
-  /// var_len_blocks_.
-  /// If is_sorted_ is true, the tuples in fixed_len_blocks_ will be in sorted 
order.
-  /// fixed_len_blocks_[i] is NULL iff it has been deleted.
+  /// var_len_blocks_. A run can have zero blocks if no rows are appended.
+  /// If the run is sorted, the tuples in fixed_len_blocks_ will be in sorted 
order.
+  /// fixed_len_blocks_[i] is NULL iff it has been transferred or deleted.
   vector<BufferedBlockMgr::Block*> fixed_len_blocks_;
 
   /// Sequence of blocks in this run containing the var-length data 
corresponding to the
-  /// var-length columns from fixed_len_blocks_. These are reconstructed to be 
in sorted
-  /// order in UnpinAllBlocks().
-  /// var_len_blocks_[i] is NULL iff it has been deleted.
+  /// var-length columns from fixed_len_blocks_. In intermediate runs, the 
var-len data is
+  /// always stored in the same order as the fixed-length tuples. In initial 
runs, the
+  /// var-len data is initially in unsorted order, but is reshuffled into 
sorted order in
+  /// UnpinAllBlocks(). A run can have no var len blocks if there are no var 
len slots or
+  /// if all the var len data is empty or NULL.
+  /// var_len_blocks_[i] is NULL iff it has been transferred or deleted.
   vector<BufferedBlockMgr::Block*> var_len_blocks_;
 
-  /// If there are var-len slots, an extra pinned block is used to copy out 
var-len data
-  /// into a new sequence of blocks in sorted order. var_len_copy_block_ 
stores this
-  /// extra allocated block.
+  /// For initial unsorted runs, an extra pinned block is needed to reorder 
var-len data
+  /// into fixed order in UnpinAllBlocks(). 'var_len_copy_block_' stores this 
extra
+  /// block. Deleted in UnpinAllBlocks().
+  /// TODO: in case of in-memory runs, this could be deleted earlier to free 
up memory.
   BufferedBlockMgr::Block* var_len_copy_block_;
 
-  /// Number of tuples so far in this run.
+  /// Number of tuples added so far to this run.
   int64_t num_tuples_;
 
   /// Number of tuples returned via GetNext(), maintained for debug purposes.
   int64_t num_tuples_returned_;
 
-  /// buffered_batch_ is used to return TupleRows to the merger when this run 
is being
-  /// merged. buffered_batch_ is returned in calls to GetNextBatch().
+  /// Used to implement GetNextBatch() interface required for the merger.
   scoped_ptr<RowBatch> buffered_batch_;
 
   /// Members used when a run is read in GetNext().
-  /// The index into the fixed_ and var_len_blocks_ vectors of the current 
blocks being
-  /// processed in GetNext().
+  /// The index into 'fixed_' and 'var_len_blocks_' of the blocks being read 
in GetNext().
   int fixed_len_blocks_index_;
   int var_len_blocks_index_;
 
-  /// If true, pin the next fixed and var-len blocks and delete the previous 
ones
-  /// in the next call to GetNext(). Set during the previous call to GetNext().
-  /// Not used if a run is already pinned.
-  bool pin_next_fixed_len_block_;
-  bool pin_next_var_len_block_;
+  /// If true, the last call to GetNext() reached the end of the previous 
fixed or
+  /// var-len block. The next call to GetNext() must increment 
'fixed_len_blocks_index_'
+  /// or 'var_len_blocks_index_'. It must also pin the next block if the run 
is unpinned.
+  bool end_of_fixed_len_block_;
+  bool end_of_var_len_block_;
 
   /// Offset into the current fixed length data block being processed.
   int fixed_len_block_offset_;
-}; // class Sorter::Run
+};
+
+/// Helper class used to iterate over tuples in a run during sorting.
+class Sorter::TupleIterator {
+ public:
+  /// Creates an iterator pointing at the tuple with the given 'index' in the 
'run'.
+  /// The index can be in the range [0, run->num_tuples()]. If it is equal to
+  /// run->num_tuples(), the iterator points to one past the end of the run, so
+  /// invoking Prev() will cause the iterator to point at the last tuple in 
the run.
+  /// 'run' must be finalized.
+  TupleIterator(Sorter::Run* run, int64_t index);
+
+  /// Default constructor used for local variable. Produces invalid iterator 
that must
+  /// be assigned before use.
+  TupleIterator() : index_(-1), tuple_(NULL), buffer_start_index_(-1),
+      buffer_end_index_(-1), block_index_(-1) { }
+
+  /// Create an iterator pointing to the first tuple in the run.
+  static inline TupleIterator Begin(Sorter::Run* run) { return 
TupleIterator(run, 0); }
+
+  /// Create an iterator pointing one past the end of the run.
+  static inline TupleIterator End(Sorter::Run* run) {
+    return TupleIterator(run, run->num_tuples());
+  }
+
+  /// Increments 'index_' and sets 'tuple_' to point to the next tuple in the 
run.
+  /// Increments 'block_index_' and advances to the next block if the next 
tuple is in
+  /// the next block. Can be advanced one past the last tuple in the run, but 
is not
+  /// valid to dereference 'tuple_' in that case. 'run' and 'tuple_size' are 
passed as
+  /// arguments to avoid redundantly storing the same values in multiple 
iterators in
+  /// perf-critical algorithms.
+  inline void Next(Sorter::Run* run, int tuple_size);
+
+  /// The reverse of Next(). Can advance one before the first tuple in the 
run, but it is
+  /// invalid to dereference 'tuple_' in that case.
+  inline void Prev(Sorter::Run* run, int tuple_size);
+
+  inline int64_t index() const { return index_; }
+  inline Tuple* tuple() const { return reinterpret_cast<Tuple*>(tuple_); }
+  /// Returns current tuple in TupleRow format. The caller should not modify 
the row.
+  inline const TupleRow* row() const {
+    return reinterpret_cast<const TupleRow*>(&tuple_);
+  }
 
+ private:
+  // Move to the next block in the run (or do nothing if at end of run).
+  // This is the slow path for Next();
+  void NextBlock(Sorter::Run* run, int tuple_size);
+
+  // Move to the previous block in the run (or do nothing if at beginning of 
run).
+  // This is the slow path for Prev();
+  void PrevBlock(Sorter::Run* run, int tuple_size);
+
+  /// Index of the current tuple in the run.
+  /// Can be -1 or run->num_rows() if Next() or Prev() moves iterator outside 
of run.
+  int64_t index_;
+
+  /// Pointer to the current tuple.
+  /// Will be an invalid pointer outside of current buffer if Next() or Prev() 
moves
+  /// iterator outside of run.
+  uint8_t* tuple_;
+
+  /// Indices of start and end tuples of block at block_index_. I.e. the 
current block
+  /// has tuples with indices in range [buffer_start_index_, buffer_end_index).
+  int64_t buffer_start_index_;
+  int64_t buffer_end_index_;
+
+  /// Index into fixed_len_blocks_ of the block containing the current tuple.
+  /// If index_ is negative or past end of run, will point to the first or 
last block
+  /// in run respectively.
+  int block_index_;
+};
 
 /// Sorts a sequence of tuples from a run in place using a provided tuple 
comparator.
 /// Quick sort is used for sequences of tuples larger that 16 elements, and 
insertion sort
@@ -227,108 +394,17 @@ class Sorter::TupleSorter {
   ~TupleSorter();
 
   /// Performs a quicksort for tuples in 'run' followed by an insertion sort to
-  /// finish smaller blocks. Returns an error status if any error is 
encountered or
-  /// if the query is cancelled.
+  /// finish smaller blocks. Only valid to call if this is an initial run that 
has not
+  /// yet been sorted. Returns an error status if any error is encountered or 
if the
+  /// query is cancelled.
   Status Sort(Run* run);
 
  private:
   static const int INSERTION_THRESHOLD = 16;
 
-  /// Helper class used to iterate over tuples in a run during quick sort and 
insertion
-  /// sort.
-  class TupleIterator {
-   public:
-    /// Creates an iterator pointing at the tuple with the given 'index' in 
parent->run_.
-    /// The index can be in the range [0, run->num_tuples()]. If it is equal to
-    /// run->num_tuples(), the iterator points to one past the end of the run, 
so
-    /// invoking Prev() will cause the iterator to point at the last tuple in 
the run.
-    TupleIterator(TupleSorter* parent, int64_t index)
-      : parent_(parent),
-        index_(index),
-        current_tuple_(NULL) {
-      DCHECK_GE(index, 0);
-      DCHECK_LE(index, parent_->run_->num_tuples_);
-      // If the run is empty, only index_ and current_tuple_ are initialized.
-      if (parent_->run_->num_tuples_ == 0) return;
-      // If the iterator is initialized to past the end, set up buffer_start_ 
and
-      // block_index_ as if it pointing to the last tuple. Add tuple_size_ 
bytes to
-      // current_tuple_, so everything is correct when Prev() is invoked.
-      int past_end_bytes = 0;
-      if (UNLIKELY(index >= parent_->run_->num_tuples_)) {
-        past_end_bytes = parent->tuple_size_;
-        index_ = parent_->run_->num_tuples_;
-        index = index_ - 1;
-      }
-      block_index_ = index / parent->block_capacity_;
-      buffer_start_ = parent->run_->fixed_len_blocks_[block_index_]->buffer();
-      int block_offset = (index % parent->block_capacity_) * 
parent->tuple_size_;
-      current_tuple_ = buffer_start_ + block_offset + past_end_bytes;
-    }
-
-    /// Default constructor used for local variable.
-    TupleIterator()
-      : parent_(NULL),
-        index_(-1),
-        current_tuple_(NULL) { }
-
-    /// Sets 'current_tuple_' to point to the next tuple in the run. 
Increments 'block_index_'
-    /// and advances to the next block if the next tuple is in the next block.
-    /// Can be advanced one past the last tuple in the run, but is not valid to
-    /// dereference 'current_tuple_' in that case.
-    void Next() {
-      current_tuple_ += parent_->tuple_size_;
-      ++index_;
-      if (UNLIKELY(current_tuple_ > buffer_start_ + 
parent_->last_tuple_block_offset_ &&
-          index_ < parent_->run_->num_tuples_)) {
-       // Don't increment block index, etc. past the end.
-       ++block_index_;
-       DCHECK_LT(block_index_, parent_->run_->fixed_len_blocks_.size());
-       buffer_start_ = 
parent_->run_->fixed_len_blocks_[block_index_]->buffer();
-       current_tuple_ = buffer_start_;
-      }
-    }
-
-    /// The reverse of Next(). Can advance one before the first tuple in the 
run, but it
-    /// is invalid to dereference 'current_tuple_' in that case.
-    void Prev() {
-      current_tuple_ -= parent_->tuple_size_;
-      --index_;
-      if (UNLIKELY(current_tuple_ < buffer_start_ && index_ >= 0)) {
-        --block_index_;
-        DCHECK_GE(block_index_, 0);
-        buffer_start_ = 
parent_->run_->fixed_len_blocks_[block_index_]->buffer();
-        current_tuple_ = buffer_start_ + parent_->last_tuple_block_offset_;
-      }
-    }
-
-   private:
-    friend class TupleSorter;
-
-    /// Pointer to the tuple sorter.
-    TupleSorter* parent_;
-
-    /// Index of the current tuple in the run.
-    int64_t index_;
-
-    /// Pointer to the current tuple.
-    uint8_t* current_tuple_;
-
-    /// Start of the buffer containing current tuple.
-    uint8_t* buffer_start_;
-
-    /// Index into run_.fixed_len_blocks_ of the block containing the current 
tuple.
-    int block_index_;
-  };
-
   /// Size of the tuples in memory.
   const int tuple_size_;
 
-  /// Number of tuples per block in a run.
-  const int block_capacity_;
-
-  /// Offset in bytes of the last tuple in a block, calculated from block and 
tuple sizes.
-  const int last_tuple_block_offset_;
-
   /// Tuple comparator with method Less() that returns true if lhs < rhs.
   const TupleRowComparator comparator_;
 
@@ -343,8 +419,7 @@ class Sorter::TupleSorter {
   Run* run_;
 
   /// Temporarily allocated space to copy and swap tuples (Both are used in 
Partition()).
-  /// temp_tuple_ points to temp_tuple_buffer_. Owned by this TupleSorter 
instance.
-  TupleRow* temp_tuple_row_;
+  /// Owned by this TupleSorter instance.
   uint8_t* temp_tuple_buffer_;
   uint8_t* swap_buffer_;
 
@@ -356,48 +431,49 @@ class Sorter::TupleSorter {
   /// Wrapper around comparator_.Less(). Also call 
comparator_.FreeLocalAllocations()
   /// on every 'state_->batch_size()' invocations of comparator_.Less(). 
Returns true
   /// if 'lhs' is less than 'rhs'.
-  bool Less(TupleRow* lhs, TupleRow* rhs);
+  bool Less(const TupleRow* lhs, const TupleRow* rhs);
 
-  /// Performs an insertion sort for rows in the range [first, last) in a run.
-  /// Returns an error status if there is any error or if the query is 
cancelled.
-  Status InsertionSort(const TupleIterator& first, const TupleIterator& last);
+  /// Perform an insertion sort for rows in the range [begin, end) in a run.
+  /// Only valid to call for ranges of size at least 1.
+  Status InsertionSort(const TupleIterator& begin, const TupleIterator& end);
 
-  /// Partitions the sequence of tuples in the range [first, last) in a run 
into two
+  /// Partitions the sequence of tuples in the range [begin, end) in a run 
into two
   /// groups around the pivot tuple - i.e. tuples in first group are <= the 
pivot, and
   /// tuples in the second group are >= pivot. Tuples are swapped in place to 
create the
   /// groups and the index to the first element in the second group is 
returned in 'cut'.
   /// Return an error status if any error is encountered or if the query is 
cancelled.
-  Status Partition(TupleIterator first, TupleIterator last, Tuple* pivot,
+  Status Partition(TupleIterator begin, TupleIterator end, const Tuple* pivot,
       TupleIterator* cut);
 
-  /// Performs a quicksort of rows in the range [first, last) followed by 
insertion sort
+  /// Performs a quicksort of rows in the range [begin, end) followed by 
insertion sort
   /// for smaller groups of elements. Return an error status for any errors or 
if the
   /// query is cancelled.
-  Status SortHelper(TupleIterator first, TupleIterator last);
+  Status SortHelper(TupleIterator begin, TupleIterator end);
 
-  /// Select a pivot to partition [first, last).
-  Tuple* SelectPivot(TupleIterator first, TupleIterator last);
+  /// Select a pivot to partition [begin, end).
+  Tuple* SelectPivot(TupleIterator begin, TupleIterator end);
 
   /// Return median of three tuples according to the sort comparator.
   Tuple* MedianOfThree(Tuple* t1, Tuple* t2, Tuple* t3);
 
-  /// Swaps tuples pointed to by left and right using the swap buffer.
-  void Swap(uint8_t* left, uint8_t* right);
-}; // class TupleSorter
+  /// Swaps tuples pointed to by left and right using 'swap_tuple'.
+  static void Swap(Tuple* left, Tuple* right, Tuple* swap_tuple, int 
tuple_size);
+};
 
 // Sorter::Run methods
 Sorter::Run::Run(Sorter* parent, TupleDescriptor* sort_tuple_desc,
-    bool materialize_slots)
+    bool initial_run)
   : sorter_(parent),
     sort_tuple_desc_(sort_tuple_desc),
     sort_tuple_size_(sort_tuple_desc->byte_size()),
+    block_capacity_(parent->block_mgr_->max_block_size() / sort_tuple_size_),
     has_var_len_slots_(sort_tuple_desc->HasVarlenSlots()),
-    materialize_slots_(materialize_slots),
-    is_sorted_(!materialize_slots),
-    is_pinned_(true),
+    initial_run_(initial_run),
+    is_pinned_(initial_run),
+    is_finalized_(false),
+    is_sorted_(!initial_run),
     var_len_copy_block_(NULL),
-    num_tuples_(0) {
-}
+    num_tuples_(0) { }
 
 Status Sorter::Run::Init() {
   BufferedBlockMgr::Block* block = NULL;
@@ -418,7 +494,8 @@ Status Sorter::Run::Init() {
       return status;
     }
     var_len_blocks_.push_back(block);
-    if (!is_sorted_) {
+    if (initial_run_) {
+      // Need additional var len block to reorder var len data in 
UnpinAllBlocks().
       RETURN_IF_ERROR(sorter_->block_mgr_->GetNewBlock(
           sorter_->block_mgr_client_, NULL, &var_len_copy_block_));
       if (var_len_copy_block_ == NULL) {
@@ -428,25 +505,33 @@ Status Sorter::Run::Init() {
       }
     }
   }
-  if (!is_sorted_) sorter_->initial_runs_counter_->Add(1);
+  if (initial_run_) {
+    sorter_->initial_runs_counter_->Add(1);
+  } else {
+    sorter_->spilled_runs_counter_->Add(1);
+  }
   return Status::OK();
 }
 
-template <bool has_var_len_data>
-Status Sorter::Run::AddBatch(RowBatch* batch, int start_index, int* 
num_processed) {
+template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN>
+Status Sorter::Run::AddBatchInternal(RowBatch* batch, int start_index, int* 
num_processed) {
+  DCHECK(!is_finalized_);
   DCHECK(!fixed_len_blocks_.empty());
+  DCHECK_EQ(HAS_VAR_LEN_SLOTS, has_var_len_slots_);
+  DCHECK_EQ(INITIAL_RUN, initial_run_);
+
   *num_processed = 0;
   BufferedBlockMgr::Block* cur_fixed_len_block = fixed_len_blocks_.back();
 
-  DCHECK_EQ(materialize_slots_, !is_sorted_);
-  if (!materialize_slots_) {
-    // If materialize slots is false the run is being constructed for an
-    // intermediate merge and the sort tuples have already been materialized.
-    // The input row should have the same schema as the sort tuples.
+  if (!INITIAL_RUN) {
+    // For intermediate merges, the input row is the sort tuple.
     DCHECK_EQ(batch->row_desc().tuple_descriptors().size(), 1);
     DCHECK_EQ(batch->row_desc().tuple_descriptors()[0], sort_tuple_desc_);
   }
 
+  /// Keep initial unsorted runs pinned in memory so we can sort them.
+  const AddBlockMode add_mode = INITIAL_RUN ? KEEP_PREV_PINNED : UNPIN_PREV;
+
   // Input rows are copied/materialized into tuples allocated in 
fixed_len_blocks_.
   // The variable length column data are copied into blocks stored in 
var_len_blocks_.
   // Input row processing is split into two loops.
@@ -469,8 +554,8 @@ Status Sorter::Run::AddBatch(RowBatch* batch, int 
start_index, int* num_processe
       int total_var_len = 0;
       TupleRow* input_row = batch->GetRow(cur_input_index);
       Tuple* new_tuple = 
cur_fixed_len_block->Allocate<Tuple>(sort_tuple_size_);
-      if (materialize_slots_) {
-        new_tuple->MaterializeExprs<has_var_len_data, true>(input_row, 
*sort_tuple_desc_,
+      if (INITIAL_RUN) {
+        new_tuple->MaterializeExprs<HAS_VAR_LEN_SLOTS, true>(input_row, 
*sort_tuple_desc_,
             sorter_->sort_tuple_slot_expr_ctxs_, NULL, &string_values, 
&total_var_len);
         if (total_var_len > sorter_->block_mgr_->max_block_size()) {
           return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, Substitute(
@@ -479,17 +564,17 @@ Status Sorter::Run::AddBatch(RowBatch* batch, int 
start_index, int* num_processe
         }
       } else {
         memcpy(new_tuple, input_row->GetTuple(0), sort_tuple_size_);
-        if (has_var_len_data) {
+        if (HAS_VAR_LEN_SLOTS) {
           CollectNonNullVarSlots(new_tuple, &string_values, &total_var_len);
         }
       }
 
-      if (has_var_len_data) {
+      if (HAS_VAR_LEN_SLOTS) {
         DCHECK_GT(var_len_blocks_.size(), 0);
         BufferedBlockMgr::Block* cur_var_len_block = var_len_blocks_.back();
         if (cur_var_len_block->BytesRemaining() < total_var_len) {
           bool added;
-          RETURN_IF_ERROR(TryAddBlock(&var_len_blocks_, &added));
+          RETURN_IF_ERROR(TryAddBlock(add_mode, &var_len_blocks_, &added));
           if (added) {
             cur_var_len_block = var_len_blocks_.back();
           } else {
@@ -502,13 +587,12 @@ Status Sorter::Run::AddBatch(RowBatch* batch, int 
start_index, int* num_processe
 
         // Sorting of tuples containing array values is not implemented. The 
planner
         // combined with projection should guarantee that none are in each 
tuple.
-        for (const SlotDescriptor* collection_slot:
-             sort_tuple_desc_->collection_slots()) {
-          DCHECK(new_tuple->IsNull(collection_slot->null_indicator_offset()));
+        for (const SlotDescriptor* coll_slot: 
sort_tuple_desc_->collection_slots()) {
+          DCHECK(new_tuple->IsNull(coll_slot->null_indicator_offset()));
         }
 
         uint8_t* var_data_ptr = 
cur_var_len_block->Allocate<uint8_t>(total_var_len);
-        if (materialize_slots_) {
+        if (INITIAL_RUN) {
           CopyVarLenData(string_values, var_data_ptr);
         } else {
           DCHECK_EQ(var_len_blocks_.back(), cur_var_len_block);
@@ -525,51 +609,60 @@ Status Sorter::Run::AddBatch(RowBatch* batch, int 
start_index, int* num_processe
     // tuples. If the run is already too long, return.
     if (cur_input_index < batch->num_rows()) {
       bool added;
-      RETURN_IF_ERROR(TryAddBlock(&fixed_len_blocks_, &added));
-      if (added) {
-        cur_fixed_len_block = fixed_len_blocks_.back();
-      } else {
-        return Status::OK();
-      }
+      RETURN_IF_ERROR(TryAddBlock(add_mode, &fixed_len_blocks_, &added));
+      if (!added) return Status::OK();
+      cur_fixed_len_block = fixed_len_blocks_.back();
     }
   }
   return Status::OK();
 }
 
-void Sorter::Run::TransferResources(RowBatch* row_batch) {
-  DCHECK(row_batch != NULL);
-  for (BufferedBlockMgr::Block* block: fixed_len_blocks_) {
-    if (block != NULL) row_batch->AddBlock(block);
-  }
-  fixed_len_blocks_.clear();
-  for (BufferedBlockMgr::Block* block: var_len_blocks_) {
-    if (block != NULL) row_batch->AddBlock(block);
+Status Sorter::Run::FinalizeInput() {
+  DCHECK(!is_finalized_);
+
+  RETURN_IF_ERROR(FinalizeBlocks(&fixed_len_blocks_));
+  if (has_var_len_slots_) {
+    RETURN_IF_ERROR(FinalizeBlocks(&var_len_blocks_));
   }
-  var_len_blocks_.clear();
-  if (var_len_copy_block_ != NULL) {
-    row_batch->AddBlock(var_len_copy_block_);
-    var_len_copy_block_ = NULL;
+  is_finalized_ = true;
+  return Status::OK();
+}
+
+Status Sorter::Run::FinalizeBlocks(vector<BufferedBlockMgr::Block*>* blocks) {
+  DCHECK_GT(blocks->size(), 0);
+  BufferedBlockMgr::Block* last_block = blocks->back();
+  if (last_block->valid_data_len() > 0) {
+    DCHECK_EQ(initial_run_, is_pinned_);
+    if (!is_pinned_) {
+      // Unpin the last block of this unpinned run. We've finished writing the 
run so
+      // all blocks in the run can now be unpinned.
+      RETURN_IF_ERROR(last_block->Unpin());
+    }
+  } else {
+    last_block->Delete();
+    blocks->pop_back();
   }
+  return Status::OK();
 }
 
 void Sorter::Run::DeleteAllBlocks() {
-  for (BufferedBlockMgr::Block* block: fixed_len_blocks_) {
-    if (block != NULL) block->Delete();
-  }
-  fixed_len_blocks_.clear();
-  for (BufferedBlockMgr::Block* block: var_len_blocks_) {
-    if (block != NULL) block->Delete();
-  }
-  var_len_blocks_.clear();
-  if (var_len_copy_block_ != NULL) {
-    var_len_copy_block_->Delete();
-    var_len_copy_block_ = NULL;
-  }
+  DeleteAndClearBlocks(&fixed_len_blocks_);
+  DeleteAndClearBlocks(&var_len_blocks_);
+  if (var_len_copy_block_ != NULL) var_len_copy_block_->Delete();
+  var_len_copy_block_ = NULL;
 }
 
 Status Sorter::Run::UnpinAllBlocks() {
+  DCHECK(is_sorted_);
+  DCHECK(initial_run_);
+  DCHECK(is_pinned_);
+  DCHECK(is_finalized_);
+  // A list of var len blocks to replace 'var_len_blocks_'. Note that after we 
are done
+  // we may have a different number of blocks, because internal fragmentation 
may leave
+  // slightly different amounts of wasted space at the end of each block.
   vector<BufferedBlockMgr::Block*> sorted_var_len_blocks;
   sorted_var_len_blocks.reserve(var_len_blocks_.size());
+
   vector<StringValue*> string_values;
   int total_var_len;
   string_values.reserve(sort_tuple_desc_->string_slots().size());
@@ -578,12 +671,22 @@ Status Sorter::Run::UnpinAllBlocks() {
     DCHECK(var_len_copy_block_ != NULL);
     sorted_var_len_blocks.push_back(var_len_copy_block_);
     cur_sorted_var_len_block = var_len_copy_block_;
+    // Set var_len_copy_block_ to NULL since it was moved to var_len_blocks_.
+    var_len_copy_block_ = NULL;
+  } else if (has_var_len_slots_) {
+    // If we don't have any var-len blocks, clean up the copy block.
+    DCHECK(var_len_copy_block_ != NULL);
+    var_len_copy_block_->Delete();
+    var_len_copy_block_ = NULL;
   } else {
     DCHECK(var_len_copy_block_ == NULL);
   }
 
   for (int i = 0; i < fixed_len_blocks_.size(); ++i) {
     BufferedBlockMgr::Block* cur_fixed_block = fixed_len_blocks_[i];
+    // Skip converting the pointers if no var-len slots, or if all the values 
are null
+    // or zero-length. This will possibly leave zero-length pointers pointing 
to
+    // arbitrary memory, but zero-length data cannot be dereferenced anyway.
     if (HasVarLenBlocks()) {
       for (int block_offset = 0; block_offset < 
cur_fixed_block->valid_data_len();
           block_offset += sort_tuple_size_) {
@@ -593,8 +696,8 @@ Status Sorter::Run::UnpinAllBlocks() {
         DCHECK(cur_sorted_var_len_block != NULL);
         if (cur_sorted_var_len_block->BytesRemaining() < total_var_len) {
           bool added;
-          RETURN_IF_ERROR(TryAddBlock(&sorted_var_len_blocks, &added));
-          DCHECK(added);
+          RETURN_IF_ERROR(TryAddBlock(UNPIN_PREV, &sorted_var_len_blocks, 
&added));
+          DCHECK(added) << "TryAddBlock() with UNPIN_PREV should not fail to 
add";
           cur_sorted_var_len_block = sorted_var_len_blocks.back();
         }
         uint8_t* var_data_ptr =
@@ -607,36 +710,33 @@ Status Sorter::Run::UnpinAllBlocks() {
     RETURN_IF_ERROR(cur_fixed_block->Unpin());
   }
 
-  if (has_var_len_slots_ && var_len_blocks_.size() > 0) {
+  if (HasVarLenBlocks()) {
     DCHECK_GT(sorted_var_len_blocks.back()->valid_data_len(), 0);
     RETURN_IF_ERROR(sorted_var_len_blocks.back()->Unpin());
   }
 
   // Clear var_len_blocks_ and replace with it with the contents of 
sorted_var_len_blocks
-  for (BufferedBlockMgr::Block* var_block: var_len_blocks_) {
-    var_block->Delete();
-  }
-  var_len_blocks_.clear();
+  DeleteAndClearBlocks(&var_len_blocks_);
   sorted_var_len_blocks.swap(var_len_blocks_);
-  // Set var_len_copy_block_ to NULL since it's now in var_len_blocks_ and is 
no longer
-  // needed.
-  var_len_copy_block_ = NULL;
   is_pinned_ = false;
+  sorter_->spilled_runs_counter_->Add(1);
   return Status::OK();
 }
 
 Status Sorter::Run::PrepareRead() {
+  DCHECK(is_finalized_);
+  DCHECK(is_sorted_);
+
   fixed_len_blocks_index_ = 0;
   fixed_len_block_offset_ = 0;
   var_len_blocks_index_ = 0;
-  pin_next_fixed_len_block_ = pin_next_var_len_block_ = false;
+  end_of_fixed_len_block_ = end_of_var_len_block_ = fixed_len_blocks_.empty();
   num_tuples_returned_ = 0;
 
   buffered_batch_.reset(new RowBatch(*sorter_->output_row_desc_,
       sorter_->state_->batch_size(), sorter_->mem_tracker_));
 
-  // If the run is pinned, merge is not invoked, so buffered_batch_ is not 
needed
-  // and the individual blocks do not need to be pinned.
+  // If the run is pinned, all blocks are already pinned, so we're ready to 
read.
   if (is_pinned_) return Status::OK();
 
   // Attempt to pin the first fixed and var-length blocks. In either case, 
pinning may
@@ -653,7 +753,7 @@ Status Sorter::Run::PrepareRead() {
     }
   }
 
-  if (has_var_len_slots_ && var_len_blocks_.size() > 0) {
+  if (HasVarLenBlocks()) {
     bool pinned;
     RETURN_IF_ERROR(var_len_blocks_[0]->Pin(&pinned));
     // Temporary work-around for IMPALA-1590. Fail the query with OOM rather 
than
@@ -668,86 +768,72 @@ Status Sorter::Run::PrepareRead() {
 }
 
 Status Sorter::Run::GetNextBatch(RowBatch** output_batch) {
-  if (buffered_batch_.get() != NULL) {
-    buffered_batch_->Reset();
-    // Fill more rows into buffered_batch_.
-    bool eos;
-    if (has_var_len_slots_ && !is_pinned_) {
-      RETURN_IF_ERROR(GetNext<true>(buffered_batch_.get(), &eos));
-      if (buffered_batch_->num_rows() == 0 && !eos) {
-        // No rows were filled because GetNext() had to read the next var-len 
block
-        // Call GetNext() again.
-        RETURN_IF_ERROR(GetNext<true>(buffered_batch_.get(), &eos));
-      }
-    } else {
-      RETURN_IF_ERROR(GetNext<false>(buffered_batch_.get(), &eos));
-    }
-    DCHECK(eos || buffered_batch_->num_rows() > 0);
-    if (eos) {
-      // No rows are filled in GetNext() on eos, so this is safe.
-      DCHECK_EQ(buffered_batch_->num_rows(), 0);
-      buffered_batch_.reset();
-      // The merge is complete. Delete the last blocks in the run.
-      fixed_len_blocks_.back()->Delete();
-      fixed_len_blocks_[fixed_len_blocks_.size() - 1] = NULL;
-      if (HasVarLenBlocks()) {
-        var_len_blocks_.back()->Delete();
-        var_len_blocks_[var_len_blocks_.size() - 1] = NULL;
-      }
-    }
+  DCHECK(buffered_batch_ != NULL);
+  buffered_batch_->Reset();
+  // Fill more rows into buffered_batch_.
+  bool eos;
+  if (HasVarLenBlocks() && !is_pinned_) {
+    RETURN_IF_ERROR(GetNext<true>(buffered_batch_.get(), &eos));
+  } else {
+    RETURN_IF_ERROR(GetNext<false>(buffered_batch_.get(), &eos));
   }
 
-  // *output_batch == NULL indicates eos.
+  if (eos) {
+    // Setting output_batch to NULL signals eos to the caller, so GetNext() is 
not
+    // allowed to attach resources to the batch on eos.
+    DCHECK_EQ(buffered_batch_->num_rows(), 0);
+    DCHECK_EQ(buffered_batch_->num_blocks(), 0);
+    *output_batch = NULL;
+    return Status::OK();
+  }
   *output_batch = buffered_batch_.get();
   return Status::OK();
 }
 
-template <bool convert_offset_to_ptr>
+template <bool CONVERT_OFFSET_TO_PTR>
 Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
-  if (fixed_len_blocks_index_ == fixed_len_blocks_.size()) {
+  // Var-len offsets are converted only when reading var-len data from 
unpinned runs.
+  if (HasVarLenBlocks()) DCHECK_EQ(!is_pinned_, CONVERT_OFFSET_TO_PTR);
+  // We shouldn't convert var len offsets if there are no blocks, since in 
that case
+  // they must all be null or zero-length strings, which don't point into a 
valid block.
+  if (!HasVarLenBlocks()) DCHECK(!CONVERT_OFFSET_TO_PTR);
+
+  if (end_of_fixed_len_block_ &&
+      fixed_len_blocks_index_ >= static_cast<int>(fixed_len_blocks_.size()) - 
1) {
+    if (is_pinned_) {
+      // All blocks were previously attached to output batches. GetNextBatch() 
assumes
+      // that we don't attach resources to the batch on eos.
+      DCHECK_EQ(NumNonNullBlocks(fixed_len_blocks_), 0);
+      DCHECK_EQ(NumNonNullBlocks(var_len_blocks_), 0);
+    } else {
+      // We held onto the last fixed or var len blocks without transferring 
them to the
+      // caller. We signalled MarkNeedToReturn() to the caller, so we can 
safely delete
+      // them now to free memory.
+      if (!fixed_len_blocks_.empty()) 
DCHECK_EQ(NumNonNullBlocks(fixed_len_blocks_), 1);
+      if (!var_len_blocks_.empty()) 
DCHECK_EQ(NumNonNullBlocks(var_len_blocks_), 1);
+    }
+    DeleteAllBlocks();
     *eos = true;
     DCHECK_EQ(num_tuples_returned_, num_tuples_);
     return Status::OK();
-  } else {
-    *eos = false;
   }
 
-  BufferedBlockMgr::Block* fixed_len_block = 
fixed_len_blocks_[fixed_len_blocks_index_];
-
-  if (!is_pinned_) {
-    // Pin the next block and delete the previous if set in the previous call 
to
-    // GetNext().
-    if (pin_next_fixed_len_block_) {
-      fixed_len_blocks_[fixed_len_blocks_index_ - 1]->Delete();
-      fixed_len_blocks_[fixed_len_blocks_index_ - 1] = NULL;
-      bool pinned;
-      RETURN_IF_ERROR(fixed_len_block->Pin(&pinned));
-      // Temporary work-around for IMPALA-2344. Fail the query with OOM rather 
than
-      // DCHECK in case block pin fails.
-      if (!pinned) {
-        Status status = Status::MemLimitExceeded();
-        status.AddDetail(Substitute(PIN_FAILED_ERROR_MSG, "fixed"));
-        return status;
-      }
-      pin_next_fixed_len_block_ = false;
-    }
-    if (pin_next_var_len_block_) {
-      var_len_blocks_[var_len_blocks_index_ - 1]->Delete();
-      var_len_blocks_[var_len_blocks_index_ - 1] = NULL;
-      bool pinned;
-      RETURN_IF_ERROR(var_len_blocks_[var_len_blocks_index_]->Pin(&pinned));
-      // Temporary work-around for IMPALA-2344. Fail the query with OOM rather 
than
-      // DCHECK in case block pin fails.
-      if (!pinned) {
-        Status status = Status::MemLimitExceeded();
-        status.AddDetail(Substitute(PIN_FAILED_ERROR_MSG, "variable"));
-        return status;
-      }
-      pin_next_var_len_block_ = false;
-    }
+  // Advance the fixed or var len block if we reached the end in the previous 
call to
+  // GetNext().
+  if (end_of_fixed_len_block_) {
+    RETURN_IF_ERROR(PinNextReadBlock(&fixed_len_blocks_, 
fixed_len_blocks_index_));
+    ++fixed_len_blocks_index_;
+    fixed_len_block_offset_ = 0;
+    end_of_fixed_len_block_ = false;
+  }
+  if (end_of_var_len_block_) {
+    RETURN_IF_ERROR(PinNextReadBlock(&var_len_blocks_, var_len_blocks_index_));
+    ++var_len_blocks_index_;
+    end_of_var_len_block_ = false;
   }
 
-  // GetNext fills rows into the output batch until a block boundary is 
reached.
+  // Fills rows into the output batch until a block boundary is reached.
+  BufferedBlockMgr::Block* fixed_len_block = 
fixed_len_blocks_[fixed_len_blocks_index_];
   DCHECK(fixed_len_block != NULL);
   while (!output_batch->AtCapacity() &&
       fixed_len_block_offset_ < fixed_len_block->valid_data_len()) {
@@ -755,57 +841,76 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* 
eos) {
     Tuple* input_tuple = reinterpret_cast<Tuple*>(
         fixed_len_block->buffer() + fixed_len_block_offset_);
 
-    if (convert_offset_to_ptr) {
-      // Convert the offsets in the var-len slots in input_tuple back to 
pointers.
-      const vector<SlotDescriptor*>& string_slots = 
sort_tuple_desc_->string_slots();
-      for (int i = 0; i < string_slots.size(); ++i) {
-        SlotDescriptor* slot_desc = string_slots[i];
-        if (input_tuple->IsNull(slot_desc->null_indicator_offset())) continue;
-
-        DCHECK(slot_desc->type().IsVarLenStringType());
-        StringValue* value = reinterpret_cast<StringValue*>(
-            input_tuple->GetSlot(slot_desc->tuple_offset()));
-
-        // packed_offset includes the block index in the upper 32 bits and the 
block
-        // offset in the lower 32 bits. See CopyVarLenDataConvertOffset().
-        uint64_t packed_offset = reinterpret_cast<uint64_t>(value->ptr);
-        int block_index = packed_offset >> 32;
-        int block_offset = packed_offset & 0xFFFFFFFF;
-
-        if (block_index > var_len_blocks_index_) {
-          // We've reached the block boundary for the current var-len block.
-          // This tuple will be returned in the next call to GetNext().
-          DCHECK_EQ(block_index, var_len_blocks_index_ + 1);
-          DCHECK_EQ(block_offset, 0);
-          DCHECK_EQ(i, 0);
-          var_len_blocks_index_ = block_index;
-          pin_next_var_len_block_ = true;
-          break;
-        } else {
-          DCHECK_EQ(block_index, var_len_blocks_index_) << packed_offset;
-          // Calculate the address implied by the offset and assign it.
-          value->ptr = reinterpret_cast<char*>(
-              var_len_blocks_[var_len_blocks_index_]->buffer() + block_offset);
-        } // if (block_index > var_len_blocks_index_)
-      } // for (int i = 0; i < string_slots.size(); ++i)
-
-      // The var-len data is in the next block, so end this call to GetNext().
-      if (pin_next_var_len_block_) break;
-    } // if (convert_offset_to_ptr)
-
-    int output_row_idx = output_batch->AddRow();
-    output_batch->GetRow(output_row_idx)->SetTuple(0, input_tuple);
+    if (CONVERT_OFFSET_TO_PTR && !ConvertOffsetsToPtrs(input_tuple)) {
+      // The var-len data is in the next block. We are done with the current 
block, so
+      // return rows we've accumulated so far and advance to the next block in 
the next
+      // GetNext() call. This is needed for the unpinned case where we can't 
pin the next
+      // block before we delete the current block.
+      if (is_pinned_) {
+        // Attach block to batch. We don't need the block any more and we 
don't need to
+        // reclaim the block's memory, since we already either have the sorted 
data all in
+        // memory. The caller can delete the block when it wants to.
+        output_batch->AddBlock(var_len_blocks_[var_len_blocks_index_]);
+        var_len_blocks_[var_len_blocks_index_] = NULL;
+      } else {
+        // To iterate over unpinned runs, we need to exchange this block for 
the next
+        // in the next GetNext() call, so we need to hold onto the block and 
signal to
+        // the caller that the block is going to be deleted.
+        output_batch->MarkNeedToReturn();
+      }
+      end_of_var_len_block_ = true;
+      break;
+    }
+    output_batch->GetRow(output_batch->AddRow())->SetTuple(0, input_tuple);
     output_batch->CommitLastRow();
     fixed_len_block_offset_ += sort_tuple_size_;
     ++num_tuples_returned_;
   }
 
-  // Reached the block boundary, need to move to the next block.
   if (fixed_len_block_offset_ >= fixed_len_block->valid_data_len()) {
-    pin_next_fixed_len_block_ = true;
-    ++fixed_len_blocks_index_;
-    fixed_len_block_offset_ = 0;
+    // Reached the block boundary, need to move to the next block.
+    if (is_pinned_) {
+      // Attach block to batch. Caller can delete the block when it wants to.
+      output_batch->AddBlock(fixed_len_blocks_[fixed_len_blocks_index_]);
+      fixed_len_blocks_[fixed_len_blocks_index_] = NULL;
+
+      // Also attach the last var-len block at eos, since it won't be attached 
elsewhere.
+      if (HasVarLenBlocks() &&
+          fixed_len_blocks_index_ == fixed_len_blocks_.size() - 1) {
+        output_batch->AddBlock(var_len_blocks_[var_len_blocks_index_]);
+        var_len_blocks_[var_len_blocks_index_] = NULL;
+      }
+    } else {
+      // To iterate over unpinned runs, we need to exchange this block for the 
next
+      // in the next GetNext() call, so we need to hold onto the block and 
signal to
+      // the caller that the block is going to be deleted.
+      output_batch->MarkNeedToReturn();
+    }
+    end_of_fixed_len_block_ = true;
   }
+  *eos = false;
+  return Status::OK();
+}
+
+Status Sorter::Run::PinNextReadBlock(vector<BufferedBlockMgr::Block*>* blocks,
+    int block_index) {
+  DCHECK_GE(block_index, 0);
+  DCHECK_LT(block_index, blocks->size() - 1);
+  BufferedBlockMgr::Block* curr_block = (*blocks)[block_index];
+  BufferedBlockMgr::Block* next_block = (*blocks)[block_index + 1];
+  DCHECK_EQ(is_pinned_, next_block->is_pinned());
+  if (is_pinned_) {
+    // The current block was attached to a batch and 'next_block' is already 
pinned.
+    DCHECK(curr_block == NULL);
+    return Status::OK();
+  }
+  bool pinned;
+  // Atomically delete the previous block and pin this one. Should not fail 
due to lack
+  // of memory. Pin() deletes the block even in error cases, so we need to 
remove it from
+  // the vector first to avoid an inconsistent state.
+  (*blocks)[block_index] = NULL;
+  RETURN_IF_ERROR(next_block->Pin(&pinned, curr_block, false));
+  DCHECK(pinned) << "Atomic delete and pin should not fail without error.";
   return Status::OK();
 }
 
@@ -823,24 +928,26 @@ void Sorter::Run::CollectNonNullVarSlots(Tuple* src,
   }
 }
 
-Status Sorter::Run::TryAddBlock(vector<BufferedBlockMgr::Block*>* 
block_sequence,
-    bool* added) {
+Status Sorter::Run::TryAddBlock(AddBlockMode mode,
+    vector<BufferedBlockMgr::Block*>* block_sequence, bool* added) {
   DCHECK(!block_sequence->empty());
-  BufferedBlockMgr::Block* last_block = block_sequence->back();
-  if (!is_sorted_) {
-    sorter_->sorted_data_size_->Add(last_block->valid_data_len());
-    last_block = NULL;
+  BufferedBlockMgr::Block* prev_block;
+  if (mode == KEEP_PREV_PINNED) {
+    prev_block = NULL;
   } else {
-    // If the run is sorted, we will unpin the last block and extend the run.
+    DCHECK(mode == UNPIN_PREV);
+    // Swap the prev block with the next, to guarantee success.
+    prev_block = block_sequence->back();
   }
 
   BufferedBlockMgr::Block* new_block;
   RETURN_IF_ERROR(sorter_->block_mgr_->GetNewBlock(
-      sorter_->block_mgr_client_, last_block, &new_block));
+      sorter_->block_mgr_client_, prev_block, &new_block));
   if (new_block != NULL) {
     *added = true;
     block_sequence->push_back(new_block);
   } else {
+    DCHECK_EQ(mode, KEEP_PREV_PINNED);
     *added = false;
   }
   return Status::OK();
@@ -872,17 +979,136 @@ void Sorter::Run::CopyVarLenDataConvertOffset(const 
vector<StringValue*>& string
   }
 }
 
-// Sorter::TupleSorter methods.
+bool Sorter::Run::ConvertOffsetsToPtrs(Tuple* tuple) {
+  // We need to be careful to handle the case where var_len_blocks_ is empty,
+  // e.g. if all strings are NULL.
+  uint8_t* block_start = var_len_blocks_.empty() ? NULL :
+      var_len_blocks_[var_len_blocks_index_]->buffer();
+
+  const vector<SlotDescriptor*>& string_slots = 
sort_tuple_desc_->string_slots();
+  for (int i = 0; i < string_slots.size(); ++i) {
+    SlotDescriptor* slot_desc = string_slots[i];
+    if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
+
+    DCHECK(slot_desc->type().IsVarLenStringType());
+    StringValue* value = reinterpret_cast<StringValue*>(
+        tuple->GetSlot(slot_desc->tuple_offset()));
+    // packed_offset includes the block index in the upper 32 bits and the 
block
+    // offset in the lower 32 bits. See CopyVarLenDataConvertOffset().
+    uint64_t packed_offset = reinterpret_cast<uint64_t>(value->ptr);
+    int block_index = packed_offset >> 32;
+    int block_offset = packed_offset & 0xFFFFFFFF;
+
+    if (block_index > var_len_blocks_index_) {
+      // We've reached the block boundary for the current var-len block.
+      // This tuple will be returned in the next call to GetNext().
+      DCHECK_GE(block_index, 0);
+      DCHECK_LE(block_index, var_len_blocks_.size());
+      DCHECK_EQ(block_index, var_len_blocks_index_ + 1);
+      DCHECK_EQ(block_offset, 0); // The data is the first thing in the next 
block.
+      DCHECK_EQ(i, 0); // Var len data for tuple shouldn't be split across 
blocks.
+      return false;
+    }
+
+    DCHECK_EQ(block_index, var_len_blocks_index_);
+    if (var_len_blocks_.empty()) {
+      DCHECK_EQ(value->len, 0);
+    } else {
+      DCHECK_LE(block_offset + value->len, 
var_len_blocks_[block_index]->valid_data_len());
+    }
+    // Calculate the address implied by the offset and assign it. May be NULL 
for
+    // zero-length strings if there are no blocks in the run since block_start 
is NULL.
+    DCHECK(block_start != NULL || block_offset == 0);
+    value->ptr = reinterpret_cast<char*>(block_start + block_offset);
+  }
+  return true;
+}
+
+int64_t Sorter::Run::TotalBytes() const {
+  int64_t total_bytes = 0;
+  for (BufferedBlockMgr::Block* block: fixed_len_blocks_) {
+    if (block != NULL) total_bytes += block->valid_data_len();
+  }
+
+  for (BufferedBlockMgr::Block* block: var_len_blocks_) {
+    if (block != NULL) total_bytes += block->valid_data_len();
+  }
+  return total_bytes;
+}
+
+Sorter::TupleIterator::TupleIterator(Sorter::Run* run, int64_t index)
+    : index_(index), tuple_(NULL) {
+  DCHECK(run->is_finalized_);
+  DCHECK_GE(index, 0);
+  DCHECK_LE(index, run->num_tuples());
+  // If the run is empty, only index_ and tuple_ are initialized.
+  if (run->num_tuples() == 0) {
+    DCHECK_EQ(index, 0);
+    return;
+  }
+
+  const int tuple_size = run->sort_tuple_size_;
+  int block_offset;
+  if (UNLIKELY(index == run->num_tuples())) {
+    // If the iterator is initialized past the end, set up buffer_start_index_,
+    // 'buffer_end_index_' and 'block_index_' for the last block, then set 
'tuple' to
+    // 'tuple_size' bytes past the last tuple, so everything is correct when 
Prev() is
+    // invoked.
+    block_index_ = run->fixed_len_blocks_.size() - 1;
+    block_offset = ((index - 1) % run->block_capacity_) * tuple_size + 
tuple_size;
+  } else {
+    block_index_ = index / run->block_capacity_;
+    block_offset = (index % run->block_capacity_) * tuple_size;
+  }
+  buffer_start_index_ = block_index_ * run->block_capacity_;
+  buffer_end_index_ = buffer_start_index_ + run->block_capacity_;
+  tuple_ = run->fixed_len_blocks_[block_index_]->buffer() + block_offset;
+}
+
+void Sorter::TupleIterator::Next(Sorter::Run* run, int tuple_size) {
+  DCHECK_LT(index_, run->num_tuples()) << "Can only advance one past end of 
run";
+  tuple_ += tuple_size;
+  ++index_;
+  if (UNLIKELY(index_ >= buffer_end_index_)) NextBlock(run, tuple_size);
+}
+
+void Sorter::TupleIterator::NextBlock(Sorter::Run* run, int tuple_size) {
+  // When moving after the last tuple, stay at the last block.
+  if (index_ >= run->num_tuples()) return;
+  ++block_index_;
+  DCHECK_LT(block_index_, run->fixed_len_blocks_.size());
+  buffer_start_index_ = block_index_ * run->block_capacity_;
+  DCHECK_EQ(index_, buffer_start_index_);
+  buffer_end_index_ = buffer_start_index_ + run->block_capacity_;
+  tuple_ = run->fixed_len_blocks_[block_index_]->buffer();
+}
+
+void Sorter::TupleIterator::Prev(Sorter::Run* run, int tuple_size) {
+  DCHECK_GE(index_, 0) << "Can only advance one before start of run";
+  tuple_ -= tuple_size;
+  --index_;
+  if (UNLIKELY(index_ < buffer_start_index_)) PrevBlock(run, tuple_size);
+}
+
+void Sorter::TupleIterator::PrevBlock(Sorter::Run* run, int tuple_size) {
+  // When moving before the first tuple, stay at the first block.
+  if (index_ < 0) return;
+  --block_index_;
+  DCHECK_GE(block_index_, 0);
+  buffer_start_index_ = block_index_ * run->block_capacity_;
+  buffer_end_index_ = buffer_start_index_ + run->block_capacity_;
+  DCHECK_EQ(index_, buffer_end_index_ - 1);
+  int last_tuple_block_offset = run->sort_tuple_size_ * (run->block_capacity_ 
- 1);
+  tuple_ = run->fixed_len_blocks_[block_index_]->buffer() + 
last_tuple_block_offset;
+}
+
 Sorter::TupleSorter::TupleSorter(const TupleRowComparator& comp, int64_t 
block_size,
     int tuple_size, RuntimeState* state)
   : tuple_size_(tuple_size),
-    block_capacity_(block_size / tuple_size),
-    last_tuple_block_offset_(tuple_size * ((block_size / tuple_size) - 1)),
     comparator_(comp),
     num_comparisons_till_free_(state->batch_size()),
     state_(state) {
   temp_tuple_buffer_ = new uint8_t[tuple_size];
-  temp_tuple_row_ = reinterpret_cast<TupleRow*>(&temp_tuple_buffer_);
   swap_buffer_ = new uint8_t[tuple_size];
 }
 
@@ -891,15 +1117,7 @@ Sorter::TupleSorter::~TupleSorter() {
   delete[] swap_buffer_;
 }
 
-Status Sorter::TupleSorter::Sort(Run* run) {
-  run_ = run;
-  RETURN_IF_ERROR(
-      SortHelper(TupleIterator(this, 0), TupleIterator(this, 
run_->num_tuples_)));
-  run->is_sorted_ = true;
-  return Status::OK();
-}
-
-bool Sorter::TupleSorter::Less(TupleRow* lhs, TupleRow* rhs) {
+bool Sorter::TupleSorter::Less(const TupleRow* lhs, const TupleRow* rhs) {
   --num_comparisons_till_free_;
   DCHECK_GE(num_comparisons_till_free_, 0);
   if (UNLIKELY(num_comparisons_till_free_ == 0)) {
@@ -909,105 +1127,127 @@ bool Sorter::TupleSorter::Less(TupleRow* lhs, 
TupleRow* rhs) {
   return comparator_.Less(lhs, rhs);
 }
 
-// Sort the sequence of tuples from [first, last).
-// Begin with a sorted sequence of size 1 [first, first+1).
+Status Sorter::TupleSorter::Sort(Run* run) {
+  DCHECK(run->is_finalized());
+  DCHECK(!run->is_sorted());
+  run_ = run;
+  RETURN_IF_ERROR(SortHelper(TupleIterator::Begin(run_), 
TupleIterator::End(run_)));
+  run_->set_sorted();
+  return Status::OK();
+}
+
+// Sort the sequence of tuples from [begin, last).
+// Begin with a sorted sequence of size 1 [begin, begin+1).
 // During each pass of the outermost loop, add the next tuple (at position 
'i') to
 // the sorted sequence by comparing it to each element of the sorted sequence
 // (reverse order) to find its correct place in the sorted sequence, copying 
tuples
 // along the way.
-Status Sorter::TupleSorter::InsertionSort(const TupleIterator& first,
-    const TupleIterator& last) {
-  TupleIterator insert_iter = first;
-  insert_iter.Next();
-  for (; insert_iter.index_ < last.index_; insert_iter.Next()) {
+Status Sorter::TupleSorter::InsertionSort(const TupleIterator& begin,
+    const TupleIterator& end) {
+  DCHECK_LT(begin.index(), end.index());
+
+  // Hoist member variable lookups out of loop to avoid extra loads inside 
loop.
+  Run* run = run_;
+  int tuple_size = tuple_size_;
+  uint8_t* temp_tuple_buffer = temp_tuple_buffer_;
+
+  TupleIterator insert_iter = begin;
+  insert_iter.Next(run, tuple_size);
+  for (; insert_iter.index() < end.index(); insert_iter.Next(run, tuple_size)) 
{
     // insert_iter points to the tuple after the currently sorted sequence 
that must
-    // be inserted into the sorted sequence. Copy to temp_tuple_row_ since it 
may be
+    // be inserted into the sorted sequence. Copy to temp_tuple_buffer_ since 
it may be
     // overwritten by the one at position 'insert_iter - 1'
-    memcpy(temp_tuple_buffer_, insert_iter.current_tuple_, tuple_size_);
+    memcpy(temp_tuple_buffer, insert_iter.tuple(), tuple_size);
 
-    // 'iter' points to the tuple that temp_tuple_row_ will be compared to.
-    // 'copy_to' is the where iter should be copied to if it is >= 
temp_tuple_row_.
+    // 'iter' points to the tuple that temp_tuple_buffer will be compared to.
+    // 'copy_to' is the where iter should be copied to if it is >= 
temp_tuple_buffer.
     // copy_to always to the next row after 'iter'
     TupleIterator iter = insert_iter;
-    iter.Prev();
-    uint8_t* copy_to = insert_iter.current_tuple_;
-    while (Less(temp_tuple_row_, 
reinterpret_cast<TupleRow*>(&iter.current_tuple_))) {
-      memcpy(copy_to, iter.current_tuple_, tuple_size_);
-      copy_to = iter.current_tuple_;
-      // Break if 'iter' has reached the first row, meaning that 
temp_tuple_row_
-      // will be inserted in position 'first'
-      if (iter.index_ <= first.index_) break;
-      iter.Prev();
+    iter.Prev(run, tuple_size);
+    Tuple* copy_to = insert_iter.tuple();
+    while (Less(reinterpret_cast<TupleRow*>(&temp_tuple_buffer), iter.row())) {
+      memcpy(copy_to, iter.tuple(), tuple_size);
+      copy_to = iter.tuple();
+      // Break if 'iter' has reached the first row, meaning that the temp row
+      // will be inserted in position 'begin'
+      if (iter.index() <= begin.index()) break;
+      iter.Prev(run, tuple_size);
     }
 
-    memcpy(copy_to, temp_tuple_buffer_, tuple_size_);
+    memcpy(copy_to, temp_tuple_buffer, tuple_size);
   }
   RETURN_IF_CANCELLED(state_);
   RETURN_IF_ERROR(state_->GetQueryStatus());
   return Status::OK();
 }
 
-Status Sorter::TupleSorter::Partition(TupleIterator first, TupleIterator last,
-    Tuple* pivot, Sorter::TupleSorter::TupleIterator* cut) {
-  // Copy pivot into temp_tuple since it points to a tuple within [first, 
last).
-  memcpy(temp_tuple_buffer_, pivot, tuple_size_);
-
-  last.Prev();
+Status Sorter::TupleSorter::Partition(TupleIterator begin,
+    TupleIterator end, const Tuple* pivot, TupleIterator* cut) {
+  // Hoist member variable lookups out of loop to avoid extra loads inside 
loop.
+  Run* run = run_;
+  int tuple_size = tuple_size_;
+  Tuple* temp_tuple = reinterpret_cast<Tuple*>(temp_tuple_buffer_);
+  Tuple* swap_tuple = reinterpret_cast<Tuple*>(swap_buffer_);
+
+  // Copy pivot into temp_tuple since it points to a tuple within [begin, end).
+  DCHECK(temp_tuple != NULL);
+  DCHECK(pivot != NULL);
+  memcpy(temp_tuple, pivot, tuple_size);
+
+  TupleIterator left = begin;
+  TupleIterator right = end;
+  right.Prev(run, tuple_size); // Set 'right' to the last tuple in range.
   while (true) {
     // Search for the first and last out-of-place elements, and swap them.
-    while (Less(reinterpret_cast<TupleRow*>(&first.current_tuple_), 
temp_tuple_row_)) {
-      first.Next();
+    while (Less(left.row(), reinterpret_cast<TupleRow*>(&temp_tuple))) {
+      left.Next(run, tuple_size);
     }
-    while (Less(temp_tuple_row_, 
reinterpret_cast<TupleRow*>(&last.current_tuple_))) {
-      last.Prev();
+    while (Less(reinterpret_cast<TupleRow*>(&temp_tuple), right.row())) {
+      right.Prev(run, tuple_size);
     }
 
-    if (first.index_ >= last.index_) break;
+    if (left.index() >= right.index()) break;
     // Swap first and last tuples.
-    Swap(first.current_tuple_, last.current_tuple_);
+    Swap(left.tuple(), right.tuple(), swap_tuple, tuple_size);
 
-    first.Next();
-    last.Prev();
+    left.Next(run, tuple_size);
+    right.Prev(run, tuple_size);
 
     RETURN_IF_CANCELLED(state_);
     RETURN_IF_ERROR(state_->GetQueryStatus());
   }
 
-  *cut = first;
+  *cut = left;
   return Status::OK();
 }
 
-Status Sorter::TupleSorter::SortHelper(TupleIterator first, TupleIterator 
last) {
+Status Sorter::TupleSorter::SortHelper(TupleIterator begin, TupleIterator end) 
{
   // Use insertion sort for smaller sequences.
-  while (last.index_ - first.index_ > INSERTION_THRESHOLD) {
-    // Select a pivot and call Partition() to split the tuples in [first, 
last) into two
+  while (end.index() - begin.index() > INSERTION_THRESHOLD) {
+    // Select a pivot and call Partition() to split the tuples in [begin, end) 
into two
     // groups (<= pivot and >= pivot) in-place. 'cut' is the index of the 
first tuple in
     // the second group.
-    Tuple* pivot = SelectPivot(first, last);
+    Tuple* pivot = SelectPivot(begin, end);
     TupleIterator cut;
-    RETURN_IF_ERROR(Partition(first, last, pivot, &cut));
+    RETURN_IF_ERROR(Partition(begin, end, pivot, &cut));
 
     // Recurse on the smaller partition. This limits stack size to log(n) 
stack frames.
-    if (cut.index_ - first.index_ < last.index_ - cut.index_) {
+    if (cut.index() - begin.index() < end.index() - cut.index()) {
       // Left partition is smaller.
-      RETURN_IF_ERROR(SortHelper(first, cut));
-      first = cut;
+      RETURN_IF_ERROR(SortHelper(begin, cut));
+      begin = cut;
     } else {
       // Right partition is equal or smaller.
-      RETURN_IF_ERROR(SortHelper(cut, last));
-      last = cut;
+      RETURN_IF_ERROR(SortHelper(cut, end));
+      end = cut;
     }
-
-    RETURN_IF_ERROR(SortHelper(cut, last));
-    last = cut;
-
-    RETURN_IF_CANCELLED(state_);
   }
-  RETURN_IF_ERROR(InsertionSort(first, last));
+
+  if (begin.index() < end.index()) RETURN_IF_ERROR(InsertionSort(begin, end));
   return Status::OK();
 }
 
-Tuple* Sorter::TupleSorter::SelectPivot(TupleIterator first, TupleIterator 
last) {
+Tuple* Sorter::TupleSorter::SelectPivot(TupleIterator begin, TupleIterator 
end) {
   // Select the median of three random tuples. The random selection avoids 
pathological
   // behaviour associated with techniques that pick a fixed element (e.g. 
picking
   // first/last/middle element) and taking the median tends to help us select 
better
@@ -1022,10 +1262,10 @@ Tuple* Sorter::TupleSorter::SelectPivot(TupleIterator 
first, TupleIterator last)
   // iterations.
   Tuple* tuples[3];
   for (int i = 0; i < 3; ++i) {
-    int64_t index = uniform_int<int64_t>(first.index_, last.index_ - 1)(rng_);
-    TupleIterator iter(this, index);
-    DCHECK(iter.current_tuple_ != NULL);
-    tuples[i] = reinterpret_cast<Tuple*>(iter.current_tuple_);
+    int64_t index = uniform_int<int64_t>(begin.index(), end.index() - 1)(rng_);
+    TupleIterator iter(run_, index);
+    DCHECK(iter.tuple() != NULL);
+    tuples[i] = iter.tuple();
   }
 
   return MedianOfThree(tuples[0], tuples[1], tuples[2]);
@@ -1067,13 +1307,13 @@ Tuple* Sorter::TupleSorter::MedianOfThree(Tuple* t1, 
Tuple* t2, Tuple* t3) {
   }
 }
 
-inline void Sorter::TupleSorter::Swap(uint8_t* left, uint8_t* right) {
-  memcpy(swap_buffer_, left, tuple_size_);
-  memcpy(left, right, tuple_size_);
-  memcpy(right, swap_buffer_, tuple_size_);
+inline void Sorter::TupleSorter::Swap(Tuple* left, Tuple* right, Tuple* 
swap_tuple,
+    int tuple_size) {
+  memcpy(swap_tuple, left, tuple_size);
+  memcpy(left, right, tuple_size);
+  memcpy(right, swap_tuple, tuple_size);
 }
 
-// Sorter methods
 Sorter::Sorter(const TupleRowComparator& compare_less_than,
     const vector<ExprContext*>& slot_materialize_expr_ctxs,
     RowDescriptor* output_row_desc, MemTracker* mem_tracker,
@@ -1096,17 +1336,9 @@ Sorter::Sorter(const TupleRowComparator& 
compare_less_than,
 }
 
 Sorter::~Sorter() {
-  // Delete blocks from the block mgr.
-  for (deque<Run*>::iterator it = sorted_runs_.begin();
-      it != sorted_runs_.end(); ++it) {
-    (*it)->DeleteAllBlocks();
-  }
-  for (deque<Run*>::iterator it = merging_runs_.begin();
-      it != merging_runs_.end(); ++it) {
-    (*it)->DeleteAllBlocks();
-  }
-  if (unsorted_run_ != NULL) unsorted_run_->DeleteAllBlocks();
-  block_mgr_->ClearReservations(block_mgr_client_);
+  DCHECK(sorted_runs_.empty());
+  DCHECK(merging_runs_.empty());
+  DCHECK(unsorted_run_ == NULL);
 }
 
 Status Sorter::Init() {
@@ -1118,6 +1350,7 @@ Status Sorter::Init() {
   unsorted_run_ = obj_pool_.Add(new Run(this, sort_tuple_desc, true));
 
   initial_runs_counter_ = ADD_COUNTER(profile_, "InitialRunsCreated", 
TUnit::UNIT);
+  spilled_runs_counter_ = ADD_COUNTER(profile_, "SpilledRuns", TUnit::UNIT);
   num_merges_counter_ = ADD_COUNTER(profile_, "TotalMergesPerformed", 
TUnit::UNIT);
   in_mem_sort_timer_ = ADD_TIMER(profile_, "InMemorySortTime");
   sorted_data_size_ = ADD_COUNTER(profile_, "SortDataSize", TUnit::BYTES);
@@ -1125,9 +1358,8 @@ Status Sorter::Init() {
   int min_blocks_required = BLOCKS_REQUIRED_FOR_MERGE;
   // Fixed and var-length blocks are separate, so we need 
BLOCKS_REQUIRED_FOR_MERGE
   // blocks for both if there is var-length data.
-  if (output_row_desc_->tuple_descriptors()[0]->HasVarlenSlots()) {
-    min_blocks_required *= 2;
-  }
+  if (has_var_len_slots_) min_blocks_required *= 2;
+
   RETURN_IF_ERROR(block_mgr_->RegisterClient(Substitute("Sorter ptr=$0", this),
       min_blocks_required, false, mem_tracker_, state_, &block_mgr_client_));
 
@@ -1142,17 +1374,12 @@ Status Sorter::AddBatch(RowBatch* batch) {
   int num_processed = 0;
   int cur_batch_index = 0;
   while (cur_batch_index < batch->num_rows()) {
-    if (has_var_len_slots_) {
-      RETURN_IF_ERROR(unsorted_run_->AddBatch<true>(
-          batch, cur_batch_index, &num_processed));
-    } else {
-      RETURN_IF_ERROR(unsorted_run_->AddBatch<false>(
-          batch, cur_batch_index, &num_processed));
-    }
+    RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, cur_batch_index, 
&num_processed));
+
     cur_batch_index += num_processed;
     if (cur_batch_index < batch->num_rows()) {
       // The current run is full. Sort it and begin the next one.
-      RETURN_IF_ERROR(SortRun());
+      RETURN_IF_ERROR(SortCurrentInputRun());
       RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllBlocks());
       unsorted_run_ = obj_pool_.Add(
           new Run(this, output_row_desc_->tuple_descriptors()[0], true));
@@ -1163,120 +1390,97 @@ Status Sorter::AddBatch(RowBatch* batch) {
 }
 
 Status Sorter::InputDone() {
-  // Sort the tuples accumulated so far in the current run.
-  RETURN_IF_ERROR(SortRun());
+  // Sort the tuples in the last run.
+  RETURN_IF_ERROR(SortCurrentInputRun());
 
   if (sorted_runs_.size() == 1) {
     // The entire input fit in one run. Read sorted rows in GetNext() directly
     // from the sorted run.
     RETURN_IF_ERROR(sorted_runs_.back()->PrepareRead());
-  } else {
-    // At least one merge is necessary.
-    int blocks_per_run = has_var_len_slots_ ? 2 : 1;
-    int min_buffers_for_merge = sorted_runs_.size() * blocks_per_run;
-    // Check if the final run needs to be unpinned.
-    bool unpinned_final = false;
-    if (block_mgr_->num_free_buffers() < min_buffers_for_merge - 
blocks_per_run) {
-      // Number of available buffers is less than the size of the final run and
-      // the buffers needed to read the remainder of the runs in memory.
-      // Unpin the final run.
-      RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllBlocks());
-      unpinned_final = true;
-    } else {
-      // No need to unpin the current run. There is enough memory to stream the
-      // other runs.
-      // TODO: revisit. It might be better to unpin some from this run if it 
means
-      // we can get double buffering in the other runs.
-    }
+    return Status::OK();
+  }
 
-    // For an intermediate merge, intermediate_merge_batch contains 
deep-copied rows from
-    // the input runs. If (unmerged_sorted_runs_.size() > 
max_runs_per_final_merge),
-    // one or more intermediate merges are required.
-    // TODO: Attempt to allocate more memory before doing intermediate merges. 
This may
-    // be possible if other operators have relinquished memory after the sort 
has built
-    // its runs.
-    if (min_buffers_for_merge > block_mgr_->available_allocated_buffers()) {
-      DCHECK(unpinned_final);
-      RETURN_IF_ERROR(MergeIntermediateRuns());
-    }
+  // At least one merge is necessary.
+  int blocks_per_run = has_var_len_slots_ ? 2 : 1;
+  int min_buffers_for_merge = sorted_runs_.size() * blocks_per_run;
+  // Check if the final run needs to be unpinned.
+  bool unpinned_final = false;
+  if (block_mgr_->num_free_buffers() < min_buffers_for_merge - blocks_per_run) 
{
+    // Number of available buffers is less than the size of the final run and
+    // the buffers needed to read the remainder of the runs in memory.
+    // Unpin the final run.
+    RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllBlocks());
+    unpinned_final = true;
+  } else {
+    // No need to unpin the current run. There is enough memory to stream the
+    // other runs.
+    // TODO: revisit. It might be better to unpin some from this run if it 
means
+    // we can get double buffering in the other runs.
+  }
 
-    // Create the final merger.
-    RETURN_IF_ERROR(CreateMerger(sorted_runs_.size()));
+  // For an intermediate merge, intermediate_merge_batch contains deep-copied 
rows from
+  // the input runs. If (unmerged_sorted_runs_.size() > 
max_runs_per_final_merge),
+  // one or more intermediate merges are required.
+  // TODO: Attempt to allocate more memory before doing intermediate merges. 
This may
+  // be possible if other operators have relinquished memory after the sort 
has built
+  // its runs.
+  if (min_buffers_for_merge > block_mgr_->available_allocated_buffers()) {
+    DCHECK(unpinned_final);
+    RETURN_IF_ERROR(MergeIntermediateRuns());
   }
+
+  // Create the final merger.
+  RETURN_IF_ERROR(CreateMerger(sorted_runs_.size()));
   return Status::OK();
 }
 
 Status Sorter::GetNext(RowBatch* output_batch, bool* eos) {
   if (sorted_runs_.size() == 1) {
-    DCHECK(sorted_runs_.back()->is_pinned_);
-    // In this case, only TupleRows are copied into output_batch. Sorted 
tuples are left
-    // in the pinned blocks in the single sorted run.
-    RETURN_IF_ERROR(sorted_runs_.back()->GetNext<false>(output_batch, eos));
-    if (*eos) sorted_runs_.back()->TransferResources(output_batch);
+    DCHECK(sorted_runs_.back()->is_pinned());
+    return sorted_runs_.back()->GetNext<false>(output_batch, eos);
   } else {
-    // In this case, rows are deep copied into output_batch.
-    RETURN_IF_ERROR(merger_->GetNext(output_batch, eos));
+    return merger_->GetNext(output_batch, eos);
   }
-  return Status::OK();
 }
 
 Status Sorter::Reset() {
+  DCHECK(unsorted_run_ == NULL);
   merger_.reset();
-  merging_runs_.clear();
-  sorted_runs_.clear();
+  // Free resources from the current runs.
+  CleanupAllRuns();
   obj_pool_.Clear();
-  DCHECK(unsorted_run_ == NULL);
   unsorted_run_ = obj_pool_.Add(
       new Run(this, output_row_desc_->tuple_descriptors()[0], true));
   RETURN_IF_ERROR(unsorted_run_->Init());
   return Status::OK();
 }
 
-Status Sorter::SortRun() {
-  BufferedBlockMgr::Block* last_block = 
unsorted_run_->fixed_len_blocks_.back();
-  if (last_block->valid_data_len() > 0) {
-    sorted_data_size_->Add(last_block->valid_data_len());
-  } else {
-    last_block->Delete();
-    unsorted_run_->fixed_len_blocks_.pop_back();
-  }
-  if (has_var_len_slots_) {
-    DCHECK(unsorted_run_->var_len_copy_block_ != NULL);
-    last_block = unsorted_run_->var_len_blocks_.back();
-    if (last_block->valid_data_len() > 0) {
-      sorted_data_size_->Add(last_block->valid_data_len());
-    } else {
-      last_block->Delete();
-      unsorted_run_->var_len_blocks_.pop_back();
-      if (unsorted_run_->var_len_blocks_.size() == 0) {
-        unsorted_run_->var_len_copy_block_->Delete();
-        unsorted_run_->var_len_copy_block_ = NULL;
-      }
-    }
-  }
+void Sorter::Close() {
+  CleanupAllRuns();
+  block_mgr_->ClearReservations(block_mgr_client_);
+  obj_pool_.Clear();
+}
+
+void Sorter::CleanupAllRuns() {
+  Run::CleanupRuns(&sorted_runs_);
+  Run::CleanupRuns(&merging_runs_);
+  if (unsorted_run_ != NULL) unsorted_run_->DeleteAllBlocks();
+  unsorted_run_ = NULL;
+}
+
+Status Sorter::SortCurrentInputRun() {
+  RETURN_IF_ERROR(unsorted_run_->FinalizeInput());
+
   {
     SCOPED_TIMER(in_mem_sort_timer_);
     RETURN_IF_ERROR(in_mem_tuple_sorter_->Sort(unsorted_run_));
   }
   sorted_runs_.push_back(unsorted_run_);
+  sorted_data_size_->Add(unsorted_run_->TotalBytes());
   unsorted_run_ = NULL;
-  return Status::OK();
-}
 
-uint64_t Sorter::EstimateMergeMem(uint64_t available_blocks,
-    RowDescriptor* row_desc, int merge_batch_size) {
-  bool has_var_len_slots = row_desc->tuple_descriptors()[0]->HasVarlenSlots();
-  int blocks_per_run = has_var_len_slots ? 2 : 1;
-  int max_input_runs_per_merge = (available_blocks / blocks_per_run) - 1;
-  // During a merge, the batches corresponding to the input runs contain only 
TupleRows.
-  // (The data itself is in pinned blocks held by the run)
-  uint64_t input_batch_mem =
-      merge_batch_size * sizeof(Tuple*) * max_input_runs_per_merge;
-  // Since rows are deep copied into the output batch for the merger, use a 
pessimistic
-  // estimate of the memory required.
-  uint64_t output_batch_mem = RowBatch::AT_CAPACITY_MEM_USAGE;
-
-  return input_batch_mem + output_batch_mem;
+  RETURN_IF_CANCELLED(state_);
+  return Status::OK();
 }
 
 Status Sorter::MergeIntermediateRuns() {
@@ -1294,16 +1498,19 @@ Status Sorter::MergeIntermediateRuns() {
   scoped_ptr<RowBatch> intermediate_merge_batch;
   while (sorted_runs_.size() > max_runs_per_final_merge) {
     // An intermediate merge adds one merge to unmerged_sorted_runs_.
-    // Merging 'runs - (max_runs_final_ - 1)' number of runs is sifficient to 
guarantee
+    // Merging 'runs - (max_runs_final_ - 1)' number of runs is sufficient to 
guarantee
     // that the final merge can be performed.
     int num_runs_to_merge = min<int>(max_runs_per_intermediate_merge,
         sorted_runs_.size() - max_runs_per_intermediate_merge);
     RETURN_IF_ERROR(CreateMerger(num_runs_to_merge));
     RowBatch intermediate_merge_batch(*output_row_desc_, state_->batch_size(),
         mem_tracker_);
-    // merged_run is the new sorted run that is produced by the intermediate 
merge.
+    // 'merged_run' is the new sorted run that is produced by the intermediate 
merge.
+    // We added 'merged_run' to 'sorted_runs_' immediately so that it is 
cleaned up
+    // in Close().
     Run* merged_run = obj_pool_.Add(
         new Run(this, output_row_desc_->tuple_descriptors()[0], false));
+    sorted_runs_.push_back(merged_run);
     RETURN_IF_ERROR(merged_run->Init());
     bool eos = false;
     while (!eos) {
@@ -1311,38 +1518,14 @@ Status Sorter::MergeIntermediateRuns() {
       int num_copied;
       RETURN_IF_CANCELLED(state_);
       RETURN_IF_ERROR(merger_->GetNext(&intermediate_merge_batch, &eos));
-      Status ret_status;
-      if (has_var_len_slots_) {
-        ret_status = merged_run->AddBatch<true>(&intermediate_merge_batch,
-            0, &num_copied);
-      } else {
-        ret_status = merged_run->AddBatch<false>(&intermediate_merge_batch,
-            0, &num_copied);
-      }
-      if (!ret_status.ok()) return ret_status;
+      RETURN_IF_ERROR(
+          merged_run->AddIntermediateBatch(&intermediate_merge_batch, 0, 
&num_copied));
 
       DCHECK_EQ(num_copied, intermediate_merge_batch.num_rows());
       intermediate_merge_batch.Reset();
     }
 
-    BufferedBlockMgr::Block* last_block = merged_run->fixed_len_blocks_.back();
-    if (last_block->valid_data_len() > 0) {
-      RETURN_IF_ERROR(last_block->Unpin());
-    } else {
-      last_block->Delete();
-      merged_run->fixed_len_blocks_.pop_back();
-    }
-    if (has_var_len_slots_) {
-      last_block = merged_run->var_len_blocks_.back();
-      if (last_block->valid_data_len() > 0) {
-        RETURN_IF_ERROR(last_block->Unpin());
-      } else {
-        last_block->Delete();
-        merged_run->var_len_blocks_.pop_back();
-      }
-    }
-    merged_run->is_pinned_ = false;
-    sorted_runs_.push_back(merged_run);
+    RETURN_IF_ERROR(merged_run->FinalizeInput());
   }
 
   return Status::OK();
@@ -1350,13 +1533,12 @@ Status Sorter::MergeIntermediateRuns() {
 
 Status Sorter::CreateMerger(int num_runs) {
   DCHECK_GT(num_runs, 1);
-
   // Clean up the runs from the previous merge.
-  for (deque<Run*>::iterator it = merging_runs_.begin();
-      it != merging_runs_.end(); ++it) {
-    (*it)->DeleteAllBlocks();
-  }
-  merging_runs_.clear();
+  Run::CleanupRuns(&merging_runs_);
+
+  // TODO: 'deep_copy_input' is set to true, which forces the merger to copy 
all rows
+  // from the runs being merged. This is unnecessary overhead that is not 
required if we
+  // correctly transfer resources.
   merger_.reset(
       new SortedRunMerger(compare_less_than_, output_row_desc_, profile_, 
true));
 


Reply via email to