http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/runtime/sorted-run-merger.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorted-run-merger.h b/be/src/runtime/sorted-run-merger.h index d622fd9..6ffc544 100644 --- a/be/src/runtime/sorted-run-merger.h +++ b/be/src/runtime/sorted-run-merger.h @@ -29,7 +29,7 @@ class RowDescriptor; class RuntimeProfile; /// SortedRunMerger is used to merge multiple sorted runs of tuples. A run is a sorted -/// sequence of row batches, which are fetched from a RunBatchSupplier function object. +/// sequence of row batches, which are fetched from a RunBatchSupplierFn function object. /// Merging is implemented using a binary min-heap that maintains the run with the next /// tuple in sorted order at the top of the heap. /// @@ -39,12 +39,18 @@ class RuntimeProfile; /// If false, GetNext() only copies tuple pointers (TupleRows) into the output batch, /// and transfers resource ownership from the input batches to the output batch when /// an input batch is processed. +/// +/// SortedRunMerger cannot handle the 'need_to_return' resource-transfer model so +/// if the RunBatchSupplierFn can return batches with the 'need_to_return' flag set, +/// the merger must have 'deep_copy_input'. TODO: once 'need_to_return' is deprecated, +/// this is no longer a problem. class SortedRunMerger { public: /// Function that returns the next batch of rows from an input sorted run. The batch - /// is owned by the supplier (i.e. not SortedRunMerger). eos is indicated by an NULL - /// batch being returned. - typedef boost::function<Status (RowBatch**)> RunBatchSupplier; + /// is owned by the supplier (i.e. not SortedRunMerger). eos is indicated by a NULL + /// batch being returned. The returned batch can have any number of rows (including + /// zero). + typedef boost::function<Status (RowBatch**)> RunBatchSupplierFn; SortedRunMerger(const TupleRowComparator& comparator, RowDescriptor* row_desc, RuntimeProfile* profile, bool deep_copy_input); @@ -52,17 +58,22 @@ class SortedRunMerger { /// Prepare this merger to merge and return rows from the sorted runs in 'input_runs'. /// Retrieves the first batch from each run and sets up the binary heap implementing /// the priority queue. - Status Prepare(const std::vector<RunBatchSupplier>& input_runs); + Status Prepare(const std::vector<RunBatchSupplierFn>& input_runs); /// Return the next batch of sorted rows from this merger. Status GetNext(RowBatch* output_batch, bool* eos); - /// Called to finalize a merge when deep_copy is false. Transfers resources from - /// all input batches to the specified output batch. - void TransferAllResources(RowBatch* transfer_resource_batch); - private: - class BatchedRowSupplier; + class SortedRunWrapper; + + /// Remove the current row from the current min RunBatchSupplierFn and try to advance to + /// the next row. If 'deep_copy_input_' is false, 'transfer_batch' must be supplied to + /// attach resources to. + /// + /// When AdvanceMinRow returns, the previous min is advanced to the next row and the + /// heap is reordered accordingly. The RunBatchSupplierFn is removed from the heap if + /// this was its last row. Any completed resources are transferred to the batch. + Status AdvanceMinRow(RowBatch* transfer_batch); /// Assuming the element at parent_index is the only out of place element in the heap, /// restore the heap property (i.e. swap elements so parent <= children). @@ -73,9 +84,9 @@ class SortedRunMerger { /// and the children of the element at index i are 2*i+1 and 2*i+2. The heap property is /// that row of the parent element is <= the rows of the child elements according to the /// comparator comparator_. - /// The BatchedRowSupplier objects used in the min_heap_ are owned by this + /// The SortedRunWrapper objects used in the min_heap_ are owned by this /// SortedRunMerger instance. - std::vector<BatchedRowSupplier*> min_heap_; + std::vector<SortedRunWrapper*> min_heap_; /// Row comparator. Returns true if lhs < rhs. TupleRowComparator comparator_; @@ -87,7 +98,7 @@ class SortedRunMerger { /// True if rows must be deep copied into the output batch. bool deep_copy_input_; - /// Pool of BatchedRowSupplier instances. + /// Pool of SortedRunWrapper instances. ObjectPool pool_; /// Times calls to GetNext().
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/runtime/sorter.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc index 299dd2f..c242b6b 100644 --- a/be/src/runtime/sorter.cc +++ b/be/src/runtime/sorter.cc @@ -45,105 +45,193 @@ const string MEM_ALLOC_FAILED_ERROR_MSG = "Failed to allocate block for $0-lengt "data needed for sorting. Reducing query concurrency or increasing the " "memory limit may help this query to complete successfully."; -/// A run is a sequence of blocks containing tuples that are or will eventually be in -/// sorted order. -/// A run may maintain two sequences of blocks - one containing the tuples themselves, -/// (i.e. fixed-len slots and ptrs to var-len data), and the other for the var-length -/// column data pointed to by those tuples. -/// Tuples in a run may be sorted in place (in-memory) and merged using a merger. +/// Delete all non-null blocks in 'blocks' and clear vector. +static void DeleteAndClearBlocks(vector<BufferedBlockMgr::Block*>* blocks) { + for (BufferedBlockMgr::Block* block: *blocks) { + if (block != NULL) block->Delete(); + } + blocks->clear(); +} + +static int NumNonNullBlocks(const vector<BufferedBlockMgr::Block*>& blocks) { + int count = 0; + for (BufferedBlockMgr::Block* block: blocks) { + if (block != NULL) ++count; + } + return count; +} + +/// A run is a sequence of tuples. The run can be sorted or unsorted (in which case the +/// Sorter will sort it). A run comprises a sequence of fixed-length blocks containing the +/// tuples themselves (i.e. fixed-len slots that may contain ptrs to var-length data), and +/// an optional sequence of var-length blocks containing the var-length data. +/// +/// Runs are either "initial runs" constructed from the sorter's input by evaluating +/// the expressions in 'sort_tuple_slot_expr_ctxs_' or "intermediate runs" constructed +/// by merging already-sorted runs. Initial runs are sorted in-place in memory. Once +/// sorted, runs can be spilled to disk to free up memory. Sorted runs are merged by +/// SortedRunMerger, either to produce the final sorted output or to produce another +/// sorted run. +/// +/// The expected calling sequence of functions is as follows: +/// * Init() to initialize the run and allocate initial blocks. +/// * Add*Batch() to add batches of tuples to the run. +/// * FinalizeInput() to signal that no more batches will be added. +/// * If the run is unsorted, it must be sorted. After that set_sorted() must be called. +/// * Once sorted, the run is ready to read in sorted order for merging or final output. +/// * PrepareRead() to allocate resources for reading the run. +/// * GetNext() (if there was a single run) or GetNextBatch() (when merging multiple runs) +/// to read from the run. +/// * Once reading is done, DeleteAllBlocks() should be called to free resources. class Sorter::Run { public: - /// materialize_slots is true for runs constructed from input rows. The input rows are - /// materialized into single sort tuples using the expressions in - /// sort_tuple_slot_expr_ctxs_. For intermediate merges, the tuples are already - /// materialized so materialize_slots is false. - Run(Sorter* parent, TupleDescriptor* sort_tuple_desc, bool materialize_slots); + Run(Sorter* parent, TupleDescriptor* sort_tuple_desc, bool initial_run); - ~Run() { DeleteAllBlocks(); } + ~Run() { + DCHECK(fixed_len_blocks_.empty()); + DCHECK(var_len_blocks_.empty()); + DCHECK(var_len_copy_block_ == NULL); + } /// Initialize the run for input rows by allocating the minimum number of required /// blocks - one block for fixed-len data added to fixed_len_blocks_, one for the /// initially unsorted var-len data added to var_len_blocks_, and one to copy sorted - /// var-len data into (var_len_copy_block_). + /// var-len data into var_len_copy_block_. Status Init(); - /// Add a batch of input rows to the current run. Returns the number - /// of rows actually added in num_processed. If the run is full (no more blocks can - /// be allocated), num_processed may be less than the number of rows in the batch. - /// If materialize_slots_ is true, materializes the input rows using the expressions - /// in sorter_->sort_tuple_slot_expr_ctxs_, else just copies the input rows. - template <bool has_var_len_data> - Status AddBatch(RowBatch* batch, int start_index, int* num_processed); + /// Add the rows from 'batch' starting at 'start_index' to the current run. Returns the + /// number of rows actually added in 'num_processed'. If the run is full (no more blocks + /// can be allocated), 'num_processed' may be less than the number of remaining rows in + /// the batch. AddInputBatch() materializes the input rows using the expressions in + /// sorter_->sort_tuple_slot_expr_ctxs_, while AddIntermediateBatch() just copies rows. + Status AddInputBatch(RowBatch* batch, int start_index, int* num_processed) { + DCHECK(initial_run_); + if (has_var_len_slots_) { + return AddBatchInternal<true, true>(batch, start_index, num_processed); + } else { + return AddBatchInternal<false, true>(batch, start_index, num_processed); + } + } + Status AddIntermediateBatch(RowBatch* batch, int start_index, int* num_processed) { + DCHECK(!initial_run_); + if (has_var_len_slots_) { + return AddBatchInternal<true, false>(batch, start_index, num_processed); + } else { + return AddBatchInternal<false, false>(batch, start_index, num_processed); + } + } - /// Attaches all fixed-len and var-len blocks to the given row batch. - void TransferResources(RowBatch* row_batch); + /// Called after the final call to Add*Batch() to do any bookkeeping necessary to + /// finalize the run. Must be called before sorting or merging the run. + Status FinalizeInput(); /// Unpins all the blocks in a sorted run. Var-length column data is copied into new /// blocks in sorted order. Pointers in the original tuples are converted to offsets - /// from the beginning of the sequence of var-len data blocks. + /// from the beginning of the sequence of var-len data blocks. Returns an error and + /// may leave some blocks pinned if an error is encountered in the block mgr. Status UnpinAllBlocks(); /// Deletes all blocks. void DeleteAllBlocks(); - /// Interface for merger - get the next batch of rows from this run. The callee (Run) - /// still owns the returned batch. Calls GetNext(RowBatch*, bool*). - Status GetNextBatch(RowBatch** sorted_batch); + /// Prepare to read a sorted run. Pins the first block(s) in the run if the run was + /// previously unpinned. + Status PrepareRead(); - private: - friend class Sorter; - friend class TupleSorter; + /// Interface for merger - get the next batch of rows from this run. This run still + /// owns the returned batch. Calls GetNext(RowBatch*, bool*). + Status GetNextBatch(RowBatch** sorted_batch); - /// Fill output_batch with rows from this run. If convert_offset_to_ptr is true, offsets + /// Fill output_batch with rows from this run. If CONVERT_OFFSET_TO_PTR is true, offsets /// in var-length slots are converted back to pointers. Only row pointers are copied - /// into output_batch. - /// If this run was unpinned, one block (2 if there are var-len slots) is pinned while + /// into output_batch. eos is set to true after all rows from the run are returned. + /// If eos is true, the returned output_batch has zero rows and has no attached blocks. + /// If this run was unpinned, one block (two if there are var-len slots) is pinned while /// rows are filled into output_batch. The block is unpinned before the next block is - /// pinned. Atmost 1 (2) block(s) will be pinned at any time. - /// If the run was pinned, the blocks are not unpinned (Sorter holds on to the memory). - /// In either case, all rows in output_batch will have their fixed and var-len data from - /// the same block. - /// TODO: If we leave the last run to be merged in memory, the fixed-len blocks can be - /// unpinned as they are consumed. - template <bool convert_offset_to_ptr> + /// pinned, so at most one (two if there are var-len slots) block(s) will be pinned at + /// once. If the run was pinned, the blocks are not unpinned and each block is attached + /// to 'output_batch' once all rows referencing data in the block have been returned, + /// either in the current batch or previous batches. In both pinned and unpinned cases, + /// all rows in output_batch will reference at most one fixed-len and one var-len block. + template <bool CONVERT_OFFSET_TO_PTR> Status GetNext(RowBatch* output_batch, bool* eos); - /// Check if a run can be extended by allocating additional blocks from the block - /// manager. Always true when building a sorted run in an intermediate merge, because - /// the current block(s) can be unpinned before getting the next free block (so a block - /// is always available) - bool CanExtendRun() const; + /// Delete all blocks in 'runs' and clear 'runs'. + static void CleanupRuns(deque<Run*>* runs) { + for (Run* run: *runs) { + run->DeleteAllBlocks(); + } + runs->clear(); + } + + /// Return total amount of fixed and var len data in run, not including blocks that + /// were already transferred. + int64_t TotalBytes() const; - /// Collect the non-null var-len (e.g. STRING) slots from 'src' in var_slots and return - /// the total length of all var_len slots in total_var_len. - void CollectNonNullVarSlots(Tuple* src, vector<StringValue*>* var_len_values, - int* total_var_len); + inline bool is_pinned() const { return is_pinned_; } + inline bool is_finalized() const { return is_finalized_; } + inline bool is_sorted() const { return is_sorted_; } + inline void set_sorted() { is_sorted_ = true; } + inline int64_t num_tuples() const { return num_tuples_; } + + private: + /// TupleIterator needs access to internals to iterate over tuples. + friend class TupleIterator; - /// Check if the current run can be extended by a block. Add the newly allocated block - /// to block_sequence, or set added to false if the run could not be extended. - /// If the run is sorted (produced by an intermediate merge), unpin the last block in - /// block_sequence before allocating and adding a new block - the run can always be - /// extended in this case. If the run is unsorted, check max_blocks_in_unsorted_run_ - /// to see if a block can be added to the run. Also updates the sort bytes counter. - Status TryAddBlock(vector<BufferedBlockMgr::Block*>* block_sequence, bool* added); + /// Templatized implementation of Add*Batch() functions. + /// INITIAL_RUN and HAS_VAR_LEN_SLOTS are template arguments for performance and must + /// match 'initial_run_' and 'has_var_len_slots_'. + template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN> + Status AddBatchInternal(RowBatch* batch, int start_index, int* num_processed); - /// Prepare to read a sorted run. Pins the first block(s) in the run if the run was - /// previously unpinned. - Status PrepareRead(); + /// Finalize the list of blocks: delete empty final blocks and unpin the previous block + /// if the run is unpinned. + Status FinalizeBlocks(vector<BufferedBlockMgr::Block*>* blocks); - /// Copy the StringValue data in var_values to dest in order and update the StringValue - /// ptrs to point to the copied data. + /// Collect the non-null var-len (e.g. STRING) slots from 'src' in 'var_len_values' and + /// return the total length of all var-len values in 'total_var_len'. + void CollectNonNullVarSlots(Tuple* src, vector<StringValue*>* var_len_values, + int* total_var_len); + + enum AddBlockMode { KEEP_PREV_PINNED, UNPIN_PREV }; + + /// Try to extend the current run by a block. If 'mode' is KEEP_PREV_PINNED, try to + /// allocate a new block, which may fail to extend the run due to lack of memory. If + /// mode is 'UNPIN_PREV', unpin the previous block in block_sequence before allocating + /// and adding a new block - this never fails due to lack of memory. + /// + /// Returns an error status only if the block manager returns an error. If no error is + /// encountered, sets 'added' to indicate whether the run was extended and returns + /// Status::OK(). The new block is appended to 'block_sequence'. + Status TryAddBlock(AddBlockMode mode, vector<BufferedBlockMgr::Block*>* block_sequence, + bool* added); + + /// Advance to the next read block. If the run is pinned, has no effect. If the run + /// is unpinned, atomically pin the block at 'block_index' + 1 in 'blocks' and delete + /// the block at 'block_index'. + Status PinNextReadBlock(vector<BufferedBlockMgr::Block*>* blocks, int block_index); + + /// Copy the StringValues in 'var_values' to 'dest' in order and update the StringValue + /// ptrs in 'dest' to point to the copied data. void CopyVarLenData(const vector<StringValue*>& var_values, uint8_t* dest); - /// Copy the StringValue in var_values to dest in order. Update the StringValue ptrs to - /// contain a packed offset for the copied data comprising block_index and the offset - /// relative to block_start. + /// Copy the StringValues in 'var_values' to 'dest' in order. Update the StringValue + /// ptrs in 'dest' to contain a packed offset for the copied data comprising + /// block_index and the offset relative to block_start. void CopyVarLenDataConvertOffset(const vector<StringValue*>& var_values, int block_index, const uint8_t* block_start, uint8_t* dest); - /// Returns true if we have var-len slots and there are var-len blocks. + /// Convert encoded offsets to valid pointers in tuple with layout 'sort_tuple_desc_'. + /// 'tuple' is modified in-place. Returns true if the pointers refer to the block at + /// 'var_len_blocks_index_' and were successfully converted or false if the var len + /// data is in the next block, in which case 'tuple' is unmodified. + bool ConvertOffsetsToPtrs(Tuple* tuple); + + /// Returns true if we have var-len blocks in the run. inline bool HasVarLenBlocks() const { - return has_var_len_slots_ && !var_len_blocks_.empty(); + // Shouldn't have any blocks unless there are slots. + DCHECK(var_len_blocks_.empty() || has_var_len_slots_); + return !var_len_blocks_.empty(); } /// Parent sorter object. @@ -153,67 +241,146 @@ class Sorter::Run { /// sort_tuple_desc_) before sorting. const TupleDescriptor* sort_tuple_desc_; - /// Sizes of sort tuple and block. + /// The size in bytes of the sort tuple. const int sort_tuple_size_; - const bool has_var_len_slots_; + /// Number of tuples per block in a run. + const int block_capacity_; - /// True if the sort tuple must be materialized from the input batch in AddBatch(). - /// materialize_slots_ is true for runs being constructed from input batches, and - /// is false for runs being constructed from intermediate merges. - const bool materialize_slots_; + const bool has_var_len_slots_; - /// True if the run is sorted. Set to true after an in-memory sort, and initialized to - /// true for runs resulting from merges. - bool is_sorted_; + /// True if this is an initial run. False implies this is an sorted intermediate run + /// resulting from merging other runs. + const bool initial_run_; - /// True if all blocks in the run are pinned. + /// True if all blocks in the run are pinned. Initial runs start off pinned and + /// can be unpinned. Intermediate runs are always unpinned. bool is_pinned_; + /// True after FinalizeInput() is called. No more tuples can be added after the + /// run is finalized. + bool is_finalized_; + + /// True if the tuples in the run are currently in sorted order. + /// Always true for intermediate runs. + bool is_sorted_; + /// Sequence of blocks in this run containing the fixed-length portion of the sort /// tuples comprising this run. The data pointed to by the var-len slots are in - /// var_len_blocks_. - /// If is_sorted_ is true, the tuples in fixed_len_blocks_ will be in sorted order. - /// fixed_len_blocks_[i] is NULL iff it has been deleted. + /// var_len_blocks_. A run can have zero blocks if no rows are appended. + /// If the run is sorted, the tuples in fixed_len_blocks_ will be in sorted order. + /// fixed_len_blocks_[i] is NULL iff it has been transferred or deleted. vector<BufferedBlockMgr::Block*> fixed_len_blocks_; /// Sequence of blocks in this run containing the var-length data corresponding to the - /// var-length columns from fixed_len_blocks_. These are reconstructed to be in sorted - /// order in UnpinAllBlocks(). - /// var_len_blocks_[i] is NULL iff it has been deleted. + /// var-length columns from fixed_len_blocks_. In intermediate runs, the var-len data is + /// always stored in the same order as the fixed-length tuples. In initial runs, the + /// var-len data is initially in unsorted order, but is reshuffled into sorted order in + /// UnpinAllBlocks(). A run can have no var len blocks if there are no var len slots or + /// if all the var len data is empty or NULL. + /// var_len_blocks_[i] is NULL iff it has been transferred or deleted. vector<BufferedBlockMgr::Block*> var_len_blocks_; - /// If there are var-len slots, an extra pinned block is used to copy out var-len data - /// into a new sequence of blocks in sorted order. var_len_copy_block_ stores this - /// extra allocated block. + /// For initial unsorted runs, an extra pinned block is needed to reorder var-len data + /// into fixed order in UnpinAllBlocks(). 'var_len_copy_block_' stores this extra + /// block. Deleted in UnpinAllBlocks(). + /// TODO: in case of in-memory runs, this could be deleted earlier to free up memory. BufferedBlockMgr::Block* var_len_copy_block_; - /// Number of tuples so far in this run. + /// Number of tuples added so far to this run. int64_t num_tuples_; /// Number of tuples returned via GetNext(), maintained for debug purposes. int64_t num_tuples_returned_; - /// buffered_batch_ is used to return TupleRows to the merger when this run is being - /// merged. buffered_batch_ is returned in calls to GetNextBatch(). + /// Used to implement GetNextBatch() interface required for the merger. scoped_ptr<RowBatch> buffered_batch_; /// Members used when a run is read in GetNext(). - /// The index into the fixed_ and var_len_blocks_ vectors of the current blocks being - /// processed in GetNext(). + /// The index into 'fixed_' and 'var_len_blocks_' of the blocks being read in GetNext(). int fixed_len_blocks_index_; int var_len_blocks_index_; - /// If true, pin the next fixed and var-len blocks and delete the previous ones - /// in the next call to GetNext(). Set during the previous call to GetNext(). - /// Not used if a run is already pinned. - bool pin_next_fixed_len_block_; - bool pin_next_var_len_block_; + /// If true, the last call to GetNext() reached the end of the previous fixed or + /// var-len block. The next call to GetNext() must increment 'fixed_len_blocks_index_' + /// or 'var_len_blocks_index_'. It must also pin the next block if the run is unpinned. + bool end_of_fixed_len_block_; + bool end_of_var_len_block_; /// Offset into the current fixed length data block being processed. int fixed_len_block_offset_; -}; // class Sorter::Run +}; + +/// Helper class used to iterate over tuples in a run during sorting. +class Sorter::TupleIterator { + public: + /// Creates an iterator pointing at the tuple with the given 'index' in the 'run'. + /// The index can be in the range [0, run->num_tuples()]. If it is equal to + /// run->num_tuples(), the iterator points to one past the end of the run, so + /// invoking Prev() will cause the iterator to point at the last tuple in the run. + /// 'run' must be finalized. + TupleIterator(Sorter::Run* run, int64_t index); + + /// Default constructor used for local variable. Produces invalid iterator that must + /// be assigned before use. + TupleIterator() : index_(-1), tuple_(NULL), buffer_start_index_(-1), + buffer_end_index_(-1), block_index_(-1) { } + + /// Create an iterator pointing to the first tuple in the run. + static inline TupleIterator Begin(Sorter::Run* run) { return TupleIterator(run, 0); } + + /// Create an iterator pointing one past the end of the run. + static inline TupleIterator End(Sorter::Run* run) { + return TupleIterator(run, run->num_tuples()); + } + + /// Increments 'index_' and sets 'tuple_' to point to the next tuple in the run. + /// Increments 'block_index_' and advances to the next block if the next tuple is in + /// the next block. Can be advanced one past the last tuple in the run, but is not + /// valid to dereference 'tuple_' in that case. 'run' and 'tuple_size' are passed as + /// arguments to avoid redundantly storing the same values in multiple iterators in + /// perf-critical algorithms. + inline void Next(Sorter::Run* run, int tuple_size); + + /// The reverse of Next(). Can advance one before the first tuple in the run, but it is + /// invalid to dereference 'tuple_' in that case. + inline void Prev(Sorter::Run* run, int tuple_size); + + inline int64_t index() const { return index_; } + inline Tuple* tuple() const { return reinterpret_cast<Tuple*>(tuple_); } + /// Returns current tuple in TupleRow format. The caller should not modify the row. + inline const TupleRow* row() const { + return reinterpret_cast<const TupleRow*>(&tuple_); + } + private: + // Move to the next block in the run (or do nothing if at end of run). + // This is the slow path for Next(); + void NextBlock(Sorter::Run* run, int tuple_size); + + // Move to the previous block in the run (or do nothing if at beginning of run). + // This is the slow path for Prev(); + void PrevBlock(Sorter::Run* run, int tuple_size); + + /// Index of the current tuple in the run. + /// Can be -1 or run->num_rows() if Next() or Prev() moves iterator outside of run. + int64_t index_; + + /// Pointer to the current tuple. + /// Will be an invalid pointer outside of current buffer if Next() or Prev() moves + /// iterator outside of run. + uint8_t* tuple_; + + /// Indices of start and end tuples of block at block_index_. I.e. the current block + /// has tuples with indices in range [buffer_start_index_, buffer_end_index). + int64_t buffer_start_index_; + int64_t buffer_end_index_; + + /// Index into fixed_len_blocks_ of the block containing the current tuple. + /// If index_ is negative or past end of run, will point to the first or last block + /// in run respectively. + int block_index_; +}; /// Sorts a sequence of tuples from a run in place using a provided tuple comparator. /// Quick sort is used for sequences of tuples larger that 16 elements, and insertion sort @@ -227,108 +394,17 @@ class Sorter::TupleSorter { ~TupleSorter(); /// Performs a quicksort for tuples in 'run' followed by an insertion sort to - /// finish smaller blocks. Returns an error status if any error is encountered or - /// if the query is cancelled. + /// finish smaller blocks. Only valid to call if this is an initial run that has not + /// yet been sorted. Returns an error status if any error is encountered or if the + /// query is cancelled. Status Sort(Run* run); private: static const int INSERTION_THRESHOLD = 16; - /// Helper class used to iterate over tuples in a run during quick sort and insertion - /// sort. - class TupleIterator { - public: - /// Creates an iterator pointing at the tuple with the given 'index' in parent->run_. - /// The index can be in the range [0, run->num_tuples()]. If it is equal to - /// run->num_tuples(), the iterator points to one past the end of the run, so - /// invoking Prev() will cause the iterator to point at the last tuple in the run. - TupleIterator(TupleSorter* parent, int64_t index) - : parent_(parent), - index_(index), - current_tuple_(NULL) { - DCHECK_GE(index, 0); - DCHECK_LE(index, parent_->run_->num_tuples_); - // If the run is empty, only index_ and current_tuple_ are initialized. - if (parent_->run_->num_tuples_ == 0) return; - // If the iterator is initialized to past the end, set up buffer_start_ and - // block_index_ as if it pointing to the last tuple. Add tuple_size_ bytes to - // current_tuple_, so everything is correct when Prev() is invoked. - int past_end_bytes = 0; - if (UNLIKELY(index >= parent_->run_->num_tuples_)) { - past_end_bytes = parent->tuple_size_; - index_ = parent_->run_->num_tuples_; - index = index_ - 1; - } - block_index_ = index / parent->block_capacity_; - buffer_start_ = parent->run_->fixed_len_blocks_[block_index_]->buffer(); - int block_offset = (index % parent->block_capacity_) * parent->tuple_size_; - current_tuple_ = buffer_start_ + block_offset + past_end_bytes; - } - - /// Default constructor used for local variable. - TupleIterator() - : parent_(NULL), - index_(-1), - current_tuple_(NULL) { } - - /// Sets 'current_tuple_' to point to the next tuple in the run. Increments 'block_index_' - /// and advances to the next block if the next tuple is in the next block. - /// Can be advanced one past the last tuple in the run, but is not valid to - /// dereference 'current_tuple_' in that case. - void Next() { - current_tuple_ += parent_->tuple_size_; - ++index_; - if (UNLIKELY(current_tuple_ > buffer_start_ + parent_->last_tuple_block_offset_ && - index_ < parent_->run_->num_tuples_)) { - // Don't increment block index, etc. past the end. - ++block_index_; - DCHECK_LT(block_index_, parent_->run_->fixed_len_blocks_.size()); - buffer_start_ = parent_->run_->fixed_len_blocks_[block_index_]->buffer(); - current_tuple_ = buffer_start_; - } - } - - /// The reverse of Next(). Can advance one before the first tuple in the run, but it - /// is invalid to dereference 'current_tuple_' in that case. - void Prev() { - current_tuple_ -= parent_->tuple_size_; - --index_; - if (UNLIKELY(current_tuple_ < buffer_start_ && index_ >= 0)) { - --block_index_; - DCHECK_GE(block_index_, 0); - buffer_start_ = parent_->run_->fixed_len_blocks_[block_index_]->buffer(); - current_tuple_ = buffer_start_ + parent_->last_tuple_block_offset_; - } - } - - private: - friend class TupleSorter; - - /// Pointer to the tuple sorter. - TupleSorter* parent_; - - /// Index of the current tuple in the run. - int64_t index_; - - /// Pointer to the current tuple. - uint8_t* current_tuple_; - - /// Start of the buffer containing current tuple. - uint8_t* buffer_start_; - - /// Index into run_.fixed_len_blocks_ of the block containing the current tuple. - int block_index_; - }; - /// Size of the tuples in memory. const int tuple_size_; - /// Number of tuples per block in a run. - const int block_capacity_; - - /// Offset in bytes of the last tuple in a block, calculated from block and tuple sizes. - const int last_tuple_block_offset_; - /// Tuple comparator with method Less() that returns true if lhs < rhs. const TupleRowComparator comparator_; @@ -343,8 +419,7 @@ class Sorter::TupleSorter { Run* run_; /// Temporarily allocated space to copy and swap tuples (Both are used in Partition()). - /// temp_tuple_ points to temp_tuple_buffer_. Owned by this TupleSorter instance. - TupleRow* temp_tuple_row_; + /// Owned by this TupleSorter instance. uint8_t* temp_tuple_buffer_; uint8_t* swap_buffer_; @@ -356,48 +431,49 @@ class Sorter::TupleSorter { /// Wrapper around comparator_.Less(). Also call comparator_.FreeLocalAllocations() /// on every 'state_->batch_size()' invocations of comparator_.Less(). Returns true /// if 'lhs' is less than 'rhs'. - bool Less(TupleRow* lhs, TupleRow* rhs); + bool Less(const TupleRow* lhs, const TupleRow* rhs); - /// Performs an insertion sort for rows in the range [first, last) in a run. - /// Returns an error status if there is any error or if the query is cancelled. - Status InsertionSort(const TupleIterator& first, const TupleIterator& last); + /// Perform an insertion sort for rows in the range [begin, end) in a run. + /// Only valid to call for ranges of size at least 1. + Status InsertionSort(const TupleIterator& begin, const TupleIterator& end); - /// Partitions the sequence of tuples in the range [first, last) in a run into two + /// Partitions the sequence of tuples in the range [begin, end) in a run into two /// groups around the pivot tuple - i.e. tuples in first group are <= the pivot, and /// tuples in the second group are >= pivot. Tuples are swapped in place to create the /// groups and the index to the first element in the second group is returned in 'cut'. /// Return an error status if any error is encountered or if the query is cancelled. - Status Partition(TupleIterator first, TupleIterator last, Tuple* pivot, + Status Partition(TupleIterator begin, TupleIterator end, const Tuple* pivot, TupleIterator* cut); - /// Performs a quicksort of rows in the range [first, last) followed by insertion sort + /// Performs a quicksort of rows in the range [begin, end) followed by insertion sort /// for smaller groups of elements. Return an error status for any errors or if the /// query is cancelled. - Status SortHelper(TupleIterator first, TupleIterator last); + Status SortHelper(TupleIterator begin, TupleIterator end); - /// Select a pivot to partition [first, last). - Tuple* SelectPivot(TupleIterator first, TupleIterator last); + /// Select a pivot to partition [begin, end). + Tuple* SelectPivot(TupleIterator begin, TupleIterator end); /// Return median of three tuples according to the sort comparator. Tuple* MedianOfThree(Tuple* t1, Tuple* t2, Tuple* t3); - /// Swaps tuples pointed to by left and right using the swap buffer. - void Swap(uint8_t* left, uint8_t* right); -}; // class TupleSorter + /// Swaps tuples pointed to by left and right using 'swap_tuple'. + static void Swap(Tuple* left, Tuple* right, Tuple* swap_tuple, int tuple_size); +}; // Sorter::Run methods Sorter::Run::Run(Sorter* parent, TupleDescriptor* sort_tuple_desc, - bool materialize_slots) + bool initial_run) : sorter_(parent), sort_tuple_desc_(sort_tuple_desc), sort_tuple_size_(sort_tuple_desc->byte_size()), + block_capacity_(parent->block_mgr_->max_block_size() / sort_tuple_size_), has_var_len_slots_(sort_tuple_desc->HasVarlenSlots()), - materialize_slots_(materialize_slots), - is_sorted_(!materialize_slots), - is_pinned_(true), + initial_run_(initial_run), + is_pinned_(initial_run), + is_finalized_(false), + is_sorted_(!initial_run), var_len_copy_block_(NULL), - num_tuples_(0) { -} + num_tuples_(0) { } Status Sorter::Run::Init() { BufferedBlockMgr::Block* block = NULL; @@ -418,7 +494,8 @@ Status Sorter::Run::Init() { return status; } var_len_blocks_.push_back(block); - if (!is_sorted_) { + if (initial_run_) { + // Need additional var len block to reorder var len data in UnpinAllBlocks(). RETURN_IF_ERROR(sorter_->block_mgr_->GetNewBlock( sorter_->block_mgr_client_, NULL, &var_len_copy_block_)); if (var_len_copy_block_ == NULL) { @@ -428,25 +505,33 @@ Status Sorter::Run::Init() { } } } - if (!is_sorted_) sorter_->initial_runs_counter_->Add(1); + if (initial_run_) { + sorter_->initial_runs_counter_->Add(1); + } else { + sorter_->spilled_runs_counter_->Add(1); + } return Status::OK(); } -template <bool has_var_len_data> -Status Sorter::Run::AddBatch(RowBatch* batch, int start_index, int* num_processed) { +template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN> +Status Sorter::Run::AddBatchInternal(RowBatch* batch, int start_index, int* num_processed) { + DCHECK(!is_finalized_); DCHECK(!fixed_len_blocks_.empty()); + DCHECK_EQ(HAS_VAR_LEN_SLOTS, has_var_len_slots_); + DCHECK_EQ(INITIAL_RUN, initial_run_); + *num_processed = 0; BufferedBlockMgr::Block* cur_fixed_len_block = fixed_len_blocks_.back(); - DCHECK_EQ(materialize_slots_, !is_sorted_); - if (!materialize_slots_) { - // If materialize slots is false the run is being constructed for an - // intermediate merge and the sort tuples have already been materialized. - // The input row should have the same schema as the sort tuples. + if (!INITIAL_RUN) { + // For intermediate merges, the input row is the sort tuple. DCHECK_EQ(batch->row_desc().tuple_descriptors().size(), 1); DCHECK_EQ(batch->row_desc().tuple_descriptors()[0], sort_tuple_desc_); } + /// Keep initial unsorted runs pinned in memory so we can sort them. + const AddBlockMode add_mode = INITIAL_RUN ? KEEP_PREV_PINNED : UNPIN_PREV; + // Input rows are copied/materialized into tuples allocated in fixed_len_blocks_. // The variable length column data are copied into blocks stored in var_len_blocks_. // Input row processing is split into two loops. @@ -469,8 +554,8 @@ Status Sorter::Run::AddBatch(RowBatch* batch, int start_index, int* num_processe int total_var_len = 0; TupleRow* input_row = batch->GetRow(cur_input_index); Tuple* new_tuple = cur_fixed_len_block->Allocate<Tuple>(sort_tuple_size_); - if (materialize_slots_) { - new_tuple->MaterializeExprs<has_var_len_data, true>(input_row, *sort_tuple_desc_, + if (INITIAL_RUN) { + new_tuple->MaterializeExprs<HAS_VAR_LEN_SLOTS, true>(input_row, *sort_tuple_desc_, sorter_->sort_tuple_slot_expr_ctxs_, NULL, &string_values, &total_var_len); if (total_var_len > sorter_->block_mgr_->max_block_size()) { return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, Substitute( @@ -479,17 +564,17 @@ Status Sorter::Run::AddBatch(RowBatch* batch, int start_index, int* num_processe } } else { memcpy(new_tuple, input_row->GetTuple(0), sort_tuple_size_); - if (has_var_len_data) { + if (HAS_VAR_LEN_SLOTS) { CollectNonNullVarSlots(new_tuple, &string_values, &total_var_len); } } - if (has_var_len_data) { + if (HAS_VAR_LEN_SLOTS) { DCHECK_GT(var_len_blocks_.size(), 0); BufferedBlockMgr::Block* cur_var_len_block = var_len_blocks_.back(); if (cur_var_len_block->BytesRemaining() < total_var_len) { bool added; - RETURN_IF_ERROR(TryAddBlock(&var_len_blocks_, &added)); + RETURN_IF_ERROR(TryAddBlock(add_mode, &var_len_blocks_, &added)); if (added) { cur_var_len_block = var_len_blocks_.back(); } else { @@ -502,13 +587,12 @@ Status Sorter::Run::AddBatch(RowBatch* batch, int start_index, int* num_processe // Sorting of tuples containing array values is not implemented. The planner // combined with projection should guarantee that none are in each tuple. - for (const SlotDescriptor* collection_slot: - sort_tuple_desc_->collection_slots()) { - DCHECK(new_tuple->IsNull(collection_slot->null_indicator_offset())); + for (const SlotDescriptor* coll_slot: sort_tuple_desc_->collection_slots()) { + DCHECK(new_tuple->IsNull(coll_slot->null_indicator_offset())); } uint8_t* var_data_ptr = cur_var_len_block->Allocate<uint8_t>(total_var_len); - if (materialize_slots_) { + if (INITIAL_RUN) { CopyVarLenData(string_values, var_data_ptr); } else { DCHECK_EQ(var_len_blocks_.back(), cur_var_len_block); @@ -525,51 +609,60 @@ Status Sorter::Run::AddBatch(RowBatch* batch, int start_index, int* num_processe // tuples. If the run is already too long, return. if (cur_input_index < batch->num_rows()) { bool added; - RETURN_IF_ERROR(TryAddBlock(&fixed_len_blocks_, &added)); - if (added) { - cur_fixed_len_block = fixed_len_blocks_.back(); - } else { - return Status::OK(); - } + RETURN_IF_ERROR(TryAddBlock(add_mode, &fixed_len_blocks_, &added)); + if (!added) return Status::OK(); + cur_fixed_len_block = fixed_len_blocks_.back(); } } return Status::OK(); } -void Sorter::Run::TransferResources(RowBatch* row_batch) { - DCHECK(row_batch != NULL); - for (BufferedBlockMgr::Block* block: fixed_len_blocks_) { - if (block != NULL) row_batch->AddBlock(block); - } - fixed_len_blocks_.clear(); - for (BufferedBlockMgr::Block* block: var_len_blocks_) { - if (block != NULL) row_batch->AddBlock(block); +Status Sorter::Run::FinalizeInput() { + DCHECK(!is_finalized_); + + RETURN_IF_ERROR(FinalizeBlocks(&fixed_len_blocks_)); + if (has_var_len_slots_) { + RETURN_IF_ERROR(FinalizeBlocks(&var_len_blocks_)); } - var_len_blocks_.clear(); - if (var_len_copy_block_ != NULL) { - row_batch->AddBlock(var_len_copy_block_); - var_len_copy_block_ = NULL; + is_finalized_ = true; + return Status::OK(); +} + +Status Sorter::Run::FinalizeBlocks(vector<BufferedBlockMgr::Block*>* blocks) { + DCHECK_GT(blocks->size(), 0); + BufferedBlockMgr::Block* last_block = blocks->back(); + if (last_block->valid_data_len() > 0) { + DCHECK_EQ(initial_run_, is_pinned_); + if (!is_pinned_) { + // Unpin the last block of this unpinned run. We've finished writing the run so + // all blocks in the run can now be unpinned. + RETURN_IF_ERROR(last_block->Unpin()); + } + } else { + last_block->Delete(); + blocks->pop_back(); } + return Status::OK(); } void Sorter::Run::DeleteAllBlocks() { - for (BufferedBlockMgr::Block* block: fixed_len_blocks_) { - if (block != NULL) block->Delete(); - } - fixed_len_blocks_.clear(); - for (BufferedBlockMgr::Block* block: var_len_blocks_) { - if (block != NULL) block->Delete(); - } - var_len_blocks_.clear(); - if (var_len_copy_block_ != NULL) { - var_len_copy_block_->Delete(); - var_len_copy_block_ = NULL; - } + DeleteAndClearBlocks(&fixed_len_blocks_); + DeleteAndClearBlocks(&var_len_blocks_); + if (var_len_copy_block_ != NULL) var_len_copy_block_->Delete(); + var_len_copy_block_ = NULL; } Status Sorter::Run::UnpinAllBlocks() { + DCHECK(is_sorted_); + DCHECK(initial_run_); + DCHECK(is_pinned_); + DCHECK(is_finalized_); + // A list of var len blocks to replace 'var_len_blocks_'. Note that after we are done + // we may have a different number of blocks, because internal fragmentation may leave + // slightly different amounts of wasted space at the end of each block. vector<BufferedBlockMgr::Block*> sorted_var_len_blocks; sorted_var_len_blocks.reserve(var_len_blocks_.size()); + vector<StringValue*> string_values; int total_var_len; string_values.reserve(sort_tuple_desc_->string_slots().size()); @@ -578,12 +671,22 @@ Status Sorter::Run::UnpinAllBlocks() { DCHECK(var_len_copy_block_ != NULL); sorted_var_len_blocks.push_back(var_len_copy_block_); cur_sorted_var_len_block = var_len_copy_block_; + // Set var_len_copy_block_ to NULL since it was moved to var_len_blocks_. + var_len_copy_block_ = NULL; + } else if (has_var_len_slots_) { + // If we don't have any var-len blocks, clean up the copy block. + DCHECK(var_len_copy_block_ != NULL); + var_len_copy_block_->Delete(); + var_len_copy_block_ = NULL; } else { DCHECK(var_len_copy_block_ == NULL); } for (int i = 0; i < fixed_len_blocks_.size(); ++i) { BufferedBlockMgr::Block* cur_fixed_block = fixed_len_blocks_[i]; + // Skip converting the pointers if no var-len slots, or if all the values are null + // or zero-length. This will possibly leave zero-length pointers pointing to + // arbitrary memory, but zero-length data cannot be dereferenced anyway. if (HasVarLenBlocks()) { for (int block_offset = 0; block_offset < cur_fixed_block->valid_data_len(); block_offset += sort_tuple_size_) { @@ -593,8 +696,8 @@ Status Sorter::Run::UnpinAllBlocks() { DCHECK(cur_sorted_var_len_block != NULL); if (cur_sorted_var_len_block->BytesRemaining() < total_var_len) { bool added; - RETURN_IF_ERROR(TryAddBlock(&sorted_var_len_blocks, &added)); - DCHECK(added); + RETURN_IF_ERROR(TryAddBlock(UNPIN_PREV, &sorted_var_len_blocks, &added)); + DCHECK(added) << "TryAddBlock() with UNPIN_PREV should not fail to add"; cur_sorted_var_len_block = sorted_var_len_blocks.back(); } uint8_t* var_data_ptr = @@ -607,36 +710,33 @@ Status Sorter::Run::UnpinAllBlocks() { RETURN_IF_ERROR(cur_fixed_block->Unpin()); } - if (has_var_len_slots_ && var_len_blocks_.size() > 0) { + if (HasVarLenBlocks()) { DCHECK_GT(sorted_var_len_blocks.back()->valid_data_len(), 0); RETURN_IF_ERROR(sorted_var_len_blocks.back()->Unpin()); } // Clear var_len_blocks_ and replace with it with the contents of sorted_var_len_blocks - for (BufferedBlockMgr::Block* var_block: var_len_blocks_) { - var_block->Delete(); - } - var_len_blocks_.clear(); + DeleteAndClearBlocks(&var_len_blocks_); sorted_var_len_blocks.swap(var_len_blocks_); - // Set var_len_copy_block_ to NULL since it's now in var_len_blocks_ and is no longer - // needed. - var_len_copy_block_ = NULL; is_pinned_ = false; + sorter_->spilled_runs_counter_->Add(1); return Status::OK(); } Status Sorter::Run::PrepareRead() { + DCHECK(is_finalized_); + DCHECK(is_sorted_); + fixed_len_blocks_index_ = 0; fixed_len_block_offset_ = 0; var_len_blocks_index_ = 0; - pin_next_fixed_len_block_ = pin_next_var_len_block_ = false; + end_of_fixed_len_block_ = end_of_var_len_block_ = fixed_len_blocks_.empty(); num_tuples_returned_ = 0; buffered_batch_.reset(new RowBatch(*sorter_->output_row_desc_, sorter_->state_->batch_size(), sorter_->mem_tracker_)); - // If the run is pinned, merge is not invoked, so buffered_batch_ is not needed - // and the individual blocks do not need to be pinned. + // If the run is pinned, all blocks are already pinned, so we're ready to read. if (is_pinned_) return Status::OK(); // Attempt to pin the first fixed and var-length blocks. In either case, pinning may @@ -653,7 +753,7 @@ Status Sorter::Run::PrepareRead() { } } - if (has_var_len_slots_ && var_len_blocks_.size() > 0) { + if (HasVarLenBlocks()) { bool pinned; RETURN_IF_ERROR(var_len_blocks_[0]->Pin(&pinned)); // Temporary work-around for IMPALA-1590. Fail the query with OOM rather than @@ -668,86 +768,72 @@ Status Sorter::Run::PrepareRead() { } Status Sorter::Run::GetNextBatch(RowBatch** output_batch) { - if (buffered_batch_.get() != NULL) { - buffered_batch_->Reset(); - // Fill more rows into buffered_batch_. - bool eos; - if (has_var_len_slots_ && !is_pinned_) { - RETURN_IF_ERROR(GetNext<true>(buffered_batch_.get(), &eos)); - if (buffered_batch_->num_rows() == 0 && !eos) { - // No rows were filled because GetNext() had to read the next var-len block - // Call GetNext() again. - RETURN_IF_ERROR(GetNext<true>(buffered_batch_.get(), &eos)); - } - } else { - RETURN_IF_ERROR(GetNext<false>(buffered_batch_.get(), &eos)); - } - DCHECK(eos || buffered_batch_->num_rows() > 0); - if (eos) { - // No rows are filled in GetNext() on eos, so this is safe. - DCHECK_EQ(buffered_batch_->num_rows(), 0); - buffered_batch_.reset(); - // The merge is complete. Delete the last blocks in the run. - fixed_len_blocks_.back()->Delete(); - fixed_len_blocks_[fixed_len_blocks_.size() - 1] = NULL; - if (HasVarLenBlocks()) { - var_len_blocks_.back()->Delete(); - var_len_blocks_[var_len_blocks_.size() - 1] = NULL; - } - } + DCHECK(buffered_batch_ != NULL); + buffered_batch_->Reset(); + // Fill more rows into buffered_batch_. + bool eos; + if (HasVarLenBlocks() && !is_pinned_) { + RETURN_IF_ERROR(GetNext<true>(buffered_batch_.get(), &eos)); + } else { + RETURN_IF_ERROR(GetNext<false>(buffered_batch_.get(), &eos)); } - // *output_batch == NULL indicates eos. + if (eos) { + // Setting output_batch to NULL signals eos to the caller, so GetNext() is not + // allowed to attach resources to the batch on eos. + DCHECK_EQ(buffered_batch_->num_rows(), 0); + DCHECK_EQ(buffered_batch_->num_blocks(), 0); + *output_batch = NULL; + return Status::OK(); + } *output_batch = buffered_batch_.get(); return Status::OK(); } -template <bool convert_offset_to_ptr> +template <bool CONVERT_OFFSET_TO_PTR> Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) { - if (fixed_len_blocks_index_ == fixed_len_blocks_.size()) { + // Var-len offsets are converted only when reading var-len data from unpinned runs. + if (HasVarLenBlocks()) DCHECK_EQ(!is_pinned_, CONVERT_OFFSET_TO_PTR); + // We shouldn't convert var len offsets if there are no blocks, since in that case + // they must all be null or zero-length strings, which don't point into a valid block. + if (!HasVarLenBlocks()) DCHECK(!CONVERT_OFFSET_TO_PTR); + + if (end_of_fixed_len_block_ && + fixed_len_blocks_index_ >= static_cast<int>(fixed_len_blocks_.size()) - 1) { + if (is_pinned_) { + // All blocks were previously attached to output batches. GetNextBatch() assumes + // that we don't attach resources to the batch on eos. + DCHECK_EQ(NumNonNullBlocks(fixed_len_blocks_), 0); + DCHECK_EQ(NumNonNullBlocks(var_len_blocks_), 0); + } else { + // We held onto the last fixed or var len blocks without transferring them to the + // caller. We signalled MarkNeedToReturn() to the caller, so we can safely delete + // them now to free memory. + if (!fixed_len_blocks_.empty()) DCHECK_EQ(NumNonNullBlocks(fixed_len_blocks_), 1); + if (!var_len_blocks_.empty()) DCHECK_EQ(NumNonNullBlocks(var_len_blocks_), 1); + } + DeleteAllBlocks(); *eos = true; DCHECK_EQ(num_tuples_returned_, num_tuples_); return Status::OK(); - } else { - *eos = false; } - BufferedBlockMgr::Block* fixed_len_block = fixed_len_blocks_[fixed_len_blocks_index_]; - - if (!is_pinned_) { - // Pin the next block and delete the previous if set in the previous call to - // GetNext(). - if (pin_next_fixed_len_block_) { - fixed_len_blocks_[fixed_len_blocks_index_ - 1]->Delete(); - fixed_len_blocks_[fixed_len_blocks_index_ - 1] = NULL; - bool pinned; - RETURN_IF_ERROR(fixed_len_block->Pin(&pinned)); - // Temporary work-around for IMPALA-2344. Fail the query with OOM rather than - // DCHECK in case block pin fails. - if (!pinned) { - Status status = Status::MemLimitExceeded(); - status.AddDetail(Substitute(PIN_FAILED_ERROR_MSG, "fixed")); - return status; - } - pin_next_fixed_len_block_ = false; - } - if (pin_next_var_len_block_) { - var_len_blocks_[var_len_blocks_index_ - 1]->Delete(); - var_len_blocks_[var_len_blocks_index_ - 1] = NULL; - bool pinned; - RETURN_IF_ERROR(var_len_blocks_[var_len_blocks_index_]->Pin(&pinned)); - // Temporary work-around for IMPALA-2344. Fail the query with OOM rather than - // DCHECK in case block pin fails. - if (!pinned) { - Status status = Status::MemLimitExceeded(); - status.AddDetail(Substitute(PIN_FAILED_ERROR_MSG, "variable")); - return status; - } - pin_next_var_len_block_ = false; - } + // Advance the fixed or var len block if we reached the end in the previous call to + // GetNext(). + if (end_of_fixed_len_block_) { + RETURN_IF_ERROR(PinNextReadBlock(&fixed_len_blocks_, fixed_len_blocks_index_)); + ++fixed_len_blocks_index_; + fixed_len_block_offset_ = 0; + end_of_fixed_len_block_ = false; + } + if (end_of_var_len_block_) { + RETURN_IF_ERROR(PinNextReadBlock(&var_len_blocks_, var_len_blocks_index_)); + ++var_len_blocks_index_; + end_of_var_len_block_ = false; } - // GetNext fills rows into the output batch until a block boundary is reached. + // Fills rows into the output batch until a block boundary is reached. + BufferedBlockMgr::Block* fixed_len_block = fixed_len_blocks_[fixed_len_blocks_index_]; DCHECK(fixed_len_block != NULL); while (!output_batch->AtCapacity() && fixed_len_block_offset_ < fixed_len_block->valid_data_len()) { @@ -755,57 +841,76 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) { Tuple* input_tuple = reinterpret_cast<Tuple*>( fixed_len_block->buffer() + fixed_len_block_offset_); - if (convert_offset_to_ptr) { - // Convert the offsets in the var-len slots in input_tuple back to pointers. - const vector<SlotDescriptor*>& string_slots = sort_tuple_desc_->string_slots(); - for (int i = 0; i < string_slots.size(); ++i) { - SlotDescriptor* slot_desc = string_slots[i]; - if (input_tuple->IsNull(slot_desc->null_indicator_offset())) continue; - - DCHECK(slot_desc->type().IsVarLenStringType()); - StringValue* value = reinterpret_cast<StringValue*>( - input_tuple->GetSlot(slot_desc->tuple_offset())); - - // packed_offset includes the block index in the upper 32 bits and the block - // offset in the lower 32 bits. See CopyVarLenDataConvertOffset(). - uint64_t packed_offset = reinterpret_cast<uint64_t>(value->ptr); - int block_index = packed_offset >> 32; - int block_offset = packed_offset & 0xFFFFFFFF; - - if (block_index > var_len_blocks_index_) { - // We've reached the block boundary for the current var-len block. - // This tuple will be returned in the next call to GetNext(). - DCHECK_EQ(block_index, var_len_blocks_index_ + 1); - DCHECK_EQ(block_offset, 0); - DCHECK_EQ(i, 0); - var_len_blocks_index_ = block_index; - pin_next_var_len_block_ = true; - break; - } else { - DCHECK_EQ(block_index, var_len_blocks_index_) << packed_offset; - // Calculate the address implied by the offset and assign it. - value->ptr = reinterpret_cast<char*>( - var_len_blocks_[var_len_blocks_index_]->buffer() + block_offset); - } // if (block_index > var_len_blocks_index_) - } // for (int i = 0; i < string_slots.size(); ++i) - - // The var-len data is in the next block, so end this call to GetNext(). - if (pin_next_var_len_block_) break; - } // if (convert_offset_to_ptr) - - int output_row_idx = output_batch->AddRow(); - output_batch->GetRow(output_row_idx)->SetTuple(0, input_tuple); + if (CONVERT_OFFSET_TO_PTR && !ConvertOffsetsToPtrs(input_tuple)) { + // The var-len data is in the next block. We are done with the current block, so + // return rows we've accumulated so far and advance to the next block in the next + // GetNext() call. This is needed for the unpinned case where we can't pin the next + // block before we delete the current block. + if (is_pinned_) { + // Attach block to batch. We don't need the block any more and we don't need to + // reclaim the block's memory, since we already either have the sorted data all in + // memory. The caller can delete the block when it wants to. + output_batch->AddBlock(var_len_blocks_[var_len_blocks_index_]); + var_len_blocks_[var_len_blocks_index_] = NULL; + } else { + // To iterate over unpinned runs, we need to exchange this block for the next + // in the next GetNext() call, so we need to hold onto the block and signal to + // the caller that the block is going to be deleted. + output_batch->MarkNeedToReturn(); + } + end_of_var_len_block_ = true; + break; + } + output_batch->GetRow(output_batch->AddRow())->SetTuple(0, input_tuple); output_batch->CommitLastRow(); fixed_len_block_offset_ += sort_tuple_size_; ++num_tuples_returned_; } - // Reached the block boundary, need to move to the next block. if (fixed_len_block_offset_ >= fixed_len_block->valid_data_len()) { - pin_next_fixed_len_block_ = true; - ++fixed_len_blocks_index_; - fixed_len_block_offset_ = 0; + // Reached the block boundary, need to move to the next block. + if (is_pinned_) { + // Attach block to batch. Caller can delete the block when it wants to. + output_batch->AddBlock(fixed_len_blocks_[fixed_len_blocks_index_]); + fixed_len_blocks_[fixed_len_blocks_index_] = NULL; + + // Also attach the last var-len block at eos, since it won't be attached elsewhere. + if (HasVarLenBlocks() && + fixed_len_blocks_index_ == fixed_len_blocks_.size() - 1) { + output_batch->AddBlock(var_len_blocks_[var_len_blocks_index_]); + var_len_blocks_[var_len_blocks_index_] = NULL; + } + } else { + // To iterate over unpinned runs, we need to exchange this block for the next + // in the next GetNext() call, so we need to hold onto the block and signal to + // the caller that the block is going to be deleted. + output_batch->MarkNeedToReturn(); + } + end_of_fixed_len_block_ = true; } + *eos = false; + return Status::OK(); +} + +Status Sorter::Run::PinNextReadBlock(vector<BufferedBlockMgr::Block*>* blocks, + int block_index) { + DCHECK_GE(block_index, 0); + DCHECK_LT(block_index, blocks->size() - 1); + BufferedBlockMgr::Block* curr_block = (*blocks)[block_index]; + BufferedBlockMgr::Block* next_block = (*blocks)[block_index + 1]; + DCHECK_EQ(is_pinned_, next_block->is_pinned()); + if (is_pinned_) { + // The current block was attached to a batch and 'next_block' is already pinned. + DCHECK(curr_block == NULL); + return Status::OK(); + } + bool pinned; + // Atomically delete the previous block and pin this one. Should not fail due to lack + // of memory. Pin() deletes the block even in error cases, so we need to remove it from + // the vector first to avoid an inconsistent state. + (*blocks)[block_index] = NULL; + RETURN_IF_ERROR(next_block->Pin(&pinned, curr_block, false)); + DCHECK(pinned) << "Atomic delete and pin should not fail without error."; return Status::OK(); } @@ -823,24 +928,26 @@ void Sorter::Run::CollectNonNullVarSlots(Tuple* src, } } -Status Sorter::Run::TryAddBlock(vector<BufferedBlockMgr::Block*>* block_sequence, - bool* added) { +Status Sorter::Run::TryAddBlock(AddBlockMode mode, + vector<BufferedBlockMgr::Block*>* block_sequence, bool* added) { DCHECK(!block_sequence->empty()); - BufferedBlockMgr::Block* last_block = block_sequence->back(); - if (!is_sorted_) { - sorter_->sorted_data_size_->Add(last_block->valid_data_len()); - last_block = NULL; + BufferedBlockMgr::Block* prev_block; + if (mode == KEEP_PREV_PINNED) { + prev_block = NULL; } else { - // If the run is sorted, we will unpin the last block and extend the run. + DCHECK(mode == UNPIN_PREV); + // Swap the prev block with the next, to guarantee success. + prev_block = block_sequence->back(); } BufferedBlockMgr::Block* new_block; RETURN_IF_ERROR(sorter_->block_mgr_->GetNewBlock( - sorter_->block_mgr_client_, last_block, &new_block)); + sorter_->block_mgr_client_, prev_block, &new_block)); if (new_block != NULL) { *added = true; block_sequence->push_back(new_block); } else { + DCHECK_EQ(mode, KEEP_PREV_PINNED); *added = false; } return Status::OK(); @@ -872,17 +979,136 @@ void Sorter::Run::CopyVarLenDataConvertOffset(const vector<StringValue*>& string } } -// Sorter::TupleSorter methods. +bool Sorter::Run::ConvertOffsetsToPtrs(Tuple* tuple) { + // We need to be careful to handle the case where var_len_blocks_ is empty, + // e.g. if all strings are NULL. + uint8_t* block_start = var_len_blocks_.empty() ? NULL : + var_len_blocks_[var_len_blocks_index_]->buffer(); + + const vector<SlotDescriptor*>& string_slots = sort_tuple_desc_->string_slots(); + for (int i = 0; i < string_slots.size(); ++i) { + SlotDescriptor* slot_desc = string_slots[i]; + if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; + + DCHECK(slot_desc->type().IsVarLenStringType()); + StringValue* value = reinterpret_cast<StringValue*>( + tuple->GetSlot(slot_desc->tuple_offset())); + // packed_offset includes the block index in the upper 32 bits and the block + // offset in the lower 32 bits. See CopyVarLenDataConvertOffset(). + uint64_t packed_offset = reinterpret_cast<uint64_t>(value->ptr); + int block_index = packed_offset >> 32; + int block_offset = packed_offset & 0xFFFFFFFF; + + if (block_index > var_len_blocks_index_) { + // We've reached the block boundary for the current var-len block. + // This tuple will be returned in the next call to GetNext(). + DCHECK_GE(block_index, 0); + DCHECK_LE(block_index, var_len_blocks_.size()); + DCHECK_EQ(block_index, var_len_blocks_index_ + 1); + DCHECK_EQ(block_offset, 0); // The data is the first thing in the next block. + DCHECK_EQ(i, 0); // Var len data for tuple shouldn't be split across blocks. + return false; + } + + DCHECK_EQ(block_index, var_len_blocks_index_); + if (var_len_blocks_.empty()) { + DCHECK_EQ(value->len, 0); + } else { + DCHECK_LE(block_offset + value->len, var_len_blocks_[block_index]->valid_data_len()); + } + // Calculate the address implied by the offset and assign it. May be NULL for + // zero-length strings if there are no blocks in the run since block_start is NULL. + DCHECK(block_start != NULL || block_offset == 0); + value->ptr = reinterpret_cast<char*>(block_start + block_offset); + } + return true; +} + +int64_t Sorter::Run::TotalBytes() const { + int64_t total_bytes = 0; + for (BufferedBlockMgr::Block* block: fixed_len_blocks_) { + if (block != NULL) total_bytes += block->valid_data_len(); + } + + for (BufferedBlockMgr::Block* block: var_len_blocks_) { + if (block != NULL) total_bytes += block->valid_data_len(); + } + return total_bytes; +} + +Sorter::TupleIterator::TupleIterator(Sorter::Run* run, int64_t index) + : index_(index), tuple_(NULL) { + DCHECK(run->is_finalized_); + DCHECK_GE(index, 0); + DCHECK_LE(index, run->num_tuples()); + // If the run is empty, only index_ and tuple_ are initialized. + if (run->num_tuples() == 0) { + DCHECK_EQ(index, 0); + return; + } + + const int tuple_size = run->sort_tuple_size_; + int block_offset; + if (UNLIKELY(index == run->num_tuples())) { + // If the iterator is initialized past the end, set up buffer_start_index_, + // 'buffer_end_index_' and 'block_index_' for the last block, then set 'tuple' to + // 'tuple_size' bytes past the last tuple, so everything is correct when Prev() is + // invoked. + block_index_ = run->fixed_len_blocks_.size() - 1; + block_offset = ((index - 1) % run->block_capacity_) * tuple_size + tuple_size; + } else { + block_index_ = index / run->block_capacity_; + block_offset = (index % run->block_capacity_) * tuple_size; + } + buffer_start_index_ = block_index_ * run->block_capacity_; + buffer_end_index_ = buffer_start_index_ + run->block_capacity_; + tuple_ = run->fixed_len_blocks_[block_index_]->buffer() + block_offset; +} + +void Sorter::TupleIterator::Next(Sorter::Run* run, int tuple_size) { + DCHECK_LT(index_, run->num_tuples()) << "Can only advance one past end of run"; + tuple_ += tuple_size; + ++index_; + if (UNLIKELY(index_ >= buffer_end_index_)) NextBlock(run, tuple_size); +} + +void Sorter::TupleIterator::NextBlock(Sorter::Run* run, int tuple_size) { + // When moving after the last tuple, stay at the last block. + if (index_ >= run->num_tuples()) return; + ++block_index_; + DCHECK_LT(block_index_, run->fixed_len_blocks_.size()); + buffer_start_index_ = block_index_ * run->block_capacity_; + DCHECK_EQ(index_, buffer_start_index_); + buffer_end_index_ = buffer_start_index_ + run->block_capacity_; + tuple_ = run->fixed_len_blocks_[block_index_]->buffer(); +} + +void Sorter::TupleIterator::Prev(Sorter::Run* run, int tuple_size) { + DCHECK_GE(index_, 0) << "Can only advance one before start of run"; + tuple_ -= tuple_size; + --index_; + if (UNLIKELY(index_ < buffer_start_index_)) PrevBlock(run, tuple_size); +} + +void Sorter::TupleIterator::PrevBlock(Sorter::Run* run, int tuple_size) { + // When moving before the first tuple, stay at the first block. + if (index_ < 0) return; + --block_index_; + DCHECK_GE(block_index_, 0); + buffer_start_index_ = block_index_ * run->block_capacity_; + buffer_end_index_ = buffer_start_index_ + run->block_capacity_; + DCHECK_EQ(index_, buffer_end_index_ - 1); + int last_tuple_block_offset = run->sort_tuple_size_ * (run->block_capacity_ - 1); + tuple_ = run->fixed_len_blocks_[block_index_]->buffer() + last_tuple_block_offset; +} + Sorter::TupleSorter::TupleSorter(const TupleRowComparator& comp, int64_t block_size, int tuple_size, RuntimeState* state) : tuple_size_(tuple_size), - block_capacity_(block_size / tuple_size), - last_tuple_block_offset_(tuple_size * ((block_size / tuple_size) - 1)), comparator_(comp), num_comparisons_till_free_(state->batch_size()), state_(state) { temp_tuple_buffer_ = new uint8_t[tuple_size]; - temp_tuple_row_ = reinterpret_cast<TupleRow*>(&temp_tuple_buffer_); swap_buffer_ = new uint8_t[tuple_size]; } @@ -891,15 +1117,7 @@ Sorter::TupleSorter::~TupleSorter() { delete[] swap_buffer_; } -Status Sorter::TupleSorter::Sort(Run* run) { - run_ = run; - RETURN_IF_ERROR( - SortHelper(TupleIterator(this, 0), TupleIterator(this, run_->num_tuples_))); - run->is_sorted_ = true; - return Status::OK(); -} - -bool Sorter::TupleSorter::Less(TupleRow* lhs, TupleRow* rhs) { +bool Sorter::TupleSorter::Less(const TupleRow* lhs, const TupleRow* rhs) { --num_comparisons_till_free_; DCHECK_GE(num_comparisons_till_free_, 0); if (UNLIKELY(num_comparisons_till_free_ == 0)) { @@ -909,105 +1127,127 @@ bool Sorter::TupleSorter::Less(TupleRow* lhs, TupleRow* rhs) { return comparator_.Less(lhs, rhs); } -// Sort the sequence of tuples from [first, last). -// Begin with a sorted sequence of size 1 [first, first+1). +Status Sorter::TupleSorter::Sort(Run* run) { + DCHECK(run->is_finalized()); + DCHECK(!run->is_sorted()); + run_ = run; + RETURN_IF_ERROR(SortHelper(TupleIterator::Begin(run_), TupleIterator::End(run_))); + run_->set_sorted(); + return Status::OK(); +} + +// Sort the sequence of tuples from [begin, last). +// Begin with a sorted sequence of size 1 [begin, begin+1). // During each pass of the outermost loop, add the next tuple (at position 'i') to // the sorted sequence by comparing it to each element of the sorted sequence // (reverse order) to find its correct place in the sorted sequence, copying tuples // along the way. -Status Sorter::TupleSorter::InsertionSort(const TupleIterator& first, - const TupleIterator& last) { - TupleIterator insert_iter = first; - insert_iter.Next(); - for (; insert_iter.index_ < last.index_; insert_iter.Next()) { +Status Sorter::TupleSorter::InsertionSort(const TupleIterator& begin, + const TupleIterator& end) { + DCHECK_LT(begin.index(), end.index()); + + // Hoist member variable lookups out of loop to avoid extra loads inside loop. + Run* run = run_; + int tuple_size = tuple_size_; + uint8_t* temp_tuple_buffer = temp_tuple_buffer_; + + TupleIterator insert_iter = begin; + insert_iter.Next(run, tuple_size); + for (; insert_iter.index() < end.index(); insert_iter.Next(run, tuple_size)) { // insert_iter points to the tuple after the currently sorted sequence that must - // be inserted into the sorted sequence. Copy to temp_tuple_row_ since it may be + // be inserted into the sorted sequence. Copy to temp_tuple_buffer_ since it may be // overwritten by the one at position 'insert_iter - 1' - memcpy(temp_tuple_buffer_, insert_iter.current_tuple_, tuple_size_); + memcpy(temp_tuple_buffer, insert_iter.tuple(), tuple_size); - // 'iter' points to the tuple that temp_tuple_row_ will be compared to. - // 'copy_to' is the where iter should be copied to if it is >= temp_tuple_row_. + // 'iter' points to the tuple that temp_tuple_buffer will be compared to. + // 'copy_to' is the where iter should be copied to if it is >= temp_tuple_buffer. // copy_to always to the next row after 'iter' TupleIterator iter = insert_iter; - iter.Prev(); - uint8_t* copy_to = insert_iter.current_tuple_; - while (Less(temp_tuple_row_, reinterpret_cast<TupleRow*>(&iter.current_tuple_))) { - memcpy(copy_to, iter.current_tuple_, tuple_size_); - copy_to = iter.current_tuple_; - // Break if 'iter' has reached the first row, meaning that temp_tuple_row_ - // will be inserted in position 'first' - if (iter.index_ <= first.index_) break; - iter.Prev(); + iter.Prev(run, tuple_size); + Tuple* copy_to = insert_iter.tuple(); + while (Less(reinterpret_cast<TupleRow*>(&temp_tuple_buffer), iter.row())) { + memcpy(copy_to, iter.tuple(), tuple_size); + copy_to = iter.tuple(); + // Break if 'iter' has reached the first row, meaning that the temp row + // will be inserted in position 'begin' + if (iter.index() <= begin.index()) break; + iter.Prev(run, tuple_size); } - memcpy(copy_to, temp_tuple_buffer_, tuple_size_); + memcpy(copy_to, temp_tuple_buffer, tuple_size); } RETURN_IF_CANCELLED(state_); RETURN_IF_ERROR(state_->GetQueryStatus()); return Status::OK(); } -Status Sorter::TupleSorter::Partition(TupleIterator first, TupleIterator last, - Tuple* pivot, Sorter::TupleSorter::TupleIterator* cut) { - // Copy pivot into temp_tuple since it points to a tuple within [first, last). - memcpy(temp_tuple_buffer_, pivot, tuple_size_); - - last.Prev(); +Status Sorter::TupleSorter::Partition(TupleIterator begin, + TupleIterator end, const Tuple* pivot, TupleIterator* cut) { + // Hoist member variable lookups out of loop to avoid extra loads inside loop. + Run* run = run_; + int tuple_size = tuple_size_; + Tuple* temp_tuple = reinterpret_cast<Tuple*>(temp_tuple_buffer_); + Tuple* swap_tuple = reinterpret_cast<Tuple*>(swap_buffer_); + + // Copy pivot into temp_tuple since it points to a tuple within [begin, end). + DCHECK(temp_tuple != NULL); + DCHECK(pivot != NULL); + memcpy(temp_tuple, pivot, tuple_size); + + TupleIterator left = begin; + TupleIterator right = end; + right.Prev(run, tuple_size); // Set 'right' to the last tuple in range. while (true) { // Search for the first and last out-of-place elements, and swap them. - while (Less(reinterpret_cast<TupleRow*>(&first.current_tuple_), temp_tuple_row_)) { - first.Next(); + while (Less(left.row(), reinterpret_cast<TupleRow*>(&temp_tuple))) { + left.Next(run, tuple_size); } - while (Less(temp_tuple_row_, reinterpret_cast<TupleRow*>(&last.current_tuple_))) { - last.Prev(); + while (Less(reinterpret_cast<TupleRow*>(&temp_tuple), right.row())) { + right.Prev(run, tuple_size); } - if (first.index_ >= last.index_) break; + if (left.index() >= right.index()) break; // Swap first and last tuples. - Swap(first.current_tuple_, last.current_tuple_); + Swap(left.tuple(), right.tuple(), swap_tuple, tuple_size); - first.Next(); - last.Prev(); + left.Next(run, tuple_size); + right.Prev(run, tuple_size); RETURN_IF_CANCELLED(state_); RETURN_IF_ERROR(state_->GetQueryStatus()); } - *cut = first; + *cut = left; return Status::OK(); } -Status Sorter::TupleSorter::SortHelper(TupleIterator first, TupleIterator last) { +Status Sorter::TupleSorter::SortHelper(TupleIterator begin, TupleIterator end) { // Use insertion sort for smaller sequences. - while (last.index_ - first.index_ > INSERTION_THRESHOLD) { - // Select a pivot and call Partition() to split the tuples in [first, last) into two + while (end.index() - begin.index() > INSERTION_THRESHOLD) { + // Select a pivot and call Partition() to split the tuples in [begin, end) into two // groups (<= pivot and >= pivot) in-place. 'cut' is the index of the first tuple in // the second group. - Tuple* pivot = SelectPivot(first, last); + Tuple* pivot = SelectPivot(begin, end); TupleIterator cut; - RETURN_IF_ERROR(Partition(first, last, pivot, &cut)); + RETURN_IF_ERROR(Partition(begin, end, pivot, &cut)); // Recurse on the smaller partition. This limits stack size to log(n) stack frames. - if (cut.index_ - first.index_ < last.index_ - cut.index_) { + if (cut.index() - begin.index() < end.index() - cut.index()) { // Left partition is smaller. - RETURN_IF_ERROR(SortHelper(first, cut)); - first = cut; + RETURN_IF_ERROR(SortHelper(begin, cut)); + begin = cut; } else { // Right partition is equal or smaller. - RETURN_IF_ERROR(SortHelper(cut, last)); - last = cut; + RETURN_IF_ERROR(SortHelper(cut, end)); + end = cut; } - - RETURN_IF_ERROR(SortHelper(cut, last)); - last = cut; - - RETURN_IF_CANCELLED(state_); } - RETURN_IF_ERROR(InsertionSort(first, last)); + + if (begin.index() < end.index()) RETURN_IF_ERROR(InsertionSort(begin, end)); return Status::OK(); } -Tuple* Sorter::TupleSorter::SelectPivot(TupleIterator first, TupleIterator last) { +Tuple* Sorter::TupleSorter::SelectPivot(TupleIterator begin, TupleIterator end) { // Select the median of three random tuples. The random selection avoids pathological // behaviour associated with techniques that pick a fixed element (e.g. picking // first/last/middle element) and taking the median tends to help us select better @@ -1022,10 +1262,10 @@ Tuple* Sorter::TupleSorter::SelectPivot(TupleIterator first, TupleIterator last) // iterations. Tuple* tuples[3]; for (int i = 0; i < 3; ++i) { - int64_t index = uniform_int<int64_t>(first.index_, last.index_ - 1)(rng_); - TupleIterator iter(this, index); - DCHECK(iter.current_tuple_ != NULL); - tuples[i] = reinterpret_cast<Tuple*>(iter.current_tuple_); + int64_t index = uniform_int<int64_t>(begin.index(), end.index() - 1)(rng_); + TupleIterator iter(run_, index); + DCHECK(iter.tuple() != NULL); + tuples[i] = iter.tuple(); } return MedianOfThree(tuples[0], tuples[1], tuples[2]); @@ -1067,13 +1307,13 @@ Tuple* Sorter::TupleSorter::MedianOfThree(Tuple* t1, Tuple* t2, Tuple* t3) { } } -inline void Sorter::TupleSorter::Swap(uint8_t* left, uint8_t* right) { - memcpy(swap_buffer_, left, tuple_size_); - memcpy(left, right, tuple_size_); - memcpy(right, swap_buffer_, tuple_size_); +inline void Sorter::TupleSorter::Swap(Tuple* left, Tuple* right, Tuple* swap_tuple, + int tuple_size) { + memcpy(swap_tuple, left, tuple_size); + memcpy(left, right, tuple_size); + memcpy(right, swap_tuple, tuple_size); } -// Sorter methods Sorter::Sorter(const TupleRowComparator& compare_less_than, const vector<ExprContext*>& slot_materialize_expr_ctxs, RowDescriptor* output_row_desc, MemTracker* mem_tracker, @@ -1096,17 +1336,9 @@ Sorter::Sorter(const TupleRowComparator& compare_less_than, } Sorter::~Sorter() { - // Delete blocks from the block mgr. - for (deque<Run*>::iterator it = sorted_runs_.begin(); - it != sorted_runs_.end(); ++it) { - (*it)->DeleteAllBlocks(); - } - for (deque<Run*>::iterator it = merging_runs_.begin(); - it != merging_runs_.end(); ++it) { - (*it)->DeleteAllBlocks(); - } - if (unsorted_run_ != NULL) unsorted_run_->DeleteAllBlocks(); - block_mgr_->ClearReservations(block_mgr_client_); + DCHECK(sorted_runs_.empty()); + DCHECK(merging_runs_.empty()); + DCHECK(unsorted_run_ == NULL); } Status Sorter::Init() { @@ -1118,6 +1350,7 @@ Status Sorter::Init() { unsorted_run_ = obj_pool_.Add(new Run(this, sort_tuple_desc, true)); initial_runs_counter_ = ADD_COUNTER(profile_, "InitialRunsCreated", TUnit::UNIT); + spilled_runs_counter_ = ADD_COUNTER(profile_, "SpilledRuns", TUnit::UNIT); num_merges_counter_ = ADD_COUNTER(profile_, "TotalMergesPerformed", TUnit::UNIT); in_mem_sort_timer_ = ADD_TIMER(profile_, "InMemorySortTime"); sorted_data_size_ = ADD_COUNTER(profile_, "SortDataSize", TUnit::BYTES); @@ -1125,9 +1358,8 @@ Status Sorter::Init() { int min_blocks_required = BLOCKS_REQUIRED_FOR_MERGE; // Fixed and var-length blocks are separate, so we need BLOCKS_REQUIRED_FOR_MERGE // blocks for both if there is var-length data. - if (output_row_desc_->tuple_descriptors()[0]->HasVarlenSlots()) { - min_blocks_required *= 2; - } + if (has_var_len_slots_) min_blocks_required *= 2; + RETURN_IF_ERROR(block_mgr_->RegisterClient(Substitute("Sorter ptr=$0", this), min_blocks_required, false, mem_tracker_, state_, &block_mgr_client_)); @@ -1142,17 +1374,12 @@ Status Sorter::AddBatch(RowBatch* batch) { int num_processed = 0; int cur_batch_index = 0; while (cur_batch_index < batch->num_rows()) { - if (has_var_len_slots_) { - RETURN_IF_ERROR(unsorted_run_->AddBatch<true>( - batch, cur_batch_index, &num_processed)); - } else { - RETURN_IF_ERROR(unsorted_run_->AddBatch<false>( - batch, cur_batch_index, &num_processed)); - } + RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, cur_batch_index, &num_processed)); + cur_batch_index += num_processed; if (cur_batch_index < batch->num_rows()) { // The current run is full. Sort it and begin the next one. - RETURN_IF_ERROR(SortRun()); + RETURN_IF_ERROR(SortCurrentInputRun()); RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllBlocks()); unsorted_run_ = obj_pool_.Add( new Run(this, output_row_desc_->tuple_descriptors()[0], true)); @@ -1163,120 +1390,97 @@ Status Sorter::AddBatch(RowBatch* batch) { } Status Sorter::InputDone() { - // Sort the tuples accumulated so far in the current run. - RETURN_IF_ERROR(SortRun()); + // Sort the tuples in the last run. + RETURN_IF_ERROR(SortCurrentInputRun()); if (sorted_runs_.size() == 1) { // The entire input fit in one run. Read sorted rows in GetNext() directly // from the sorted run. RETURN_IF_ERROR(sorted_runs_.back()->PrepareRead()); - } else { - // At least one merge is necessary. - int blocks_per_run = has_var_len_slots_ ? 2 : 1; - int min_buffers_for_merge = sorted_runs_.size() * blocks_per_run; - // Check if the final run needs to be unpinned. - bool unpinned_final = false; - if (block_mgr_->num_free_buffers() < min_buffers_for_merge - blocks_per_run) { - // Number of available buffers is less than the size of the final run and - // the buffers needed to read the remainder of the runs in memory. - // Unpin the final run. - RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllBlocks()); - unpinned_final = true; - } else { - // No need to unpin the current run. There is enough memory to stream the - // other runs. - // TODO: revisit. It might be better to unpin some from this run if it means - // we can get double buffering in the other runs. - } + return Status::OK(); + } - // For an intermediate merge, intermediate_merge_batch contains deep-copied rows from - // the input runs. If (unmerged_sorted_runs_.size() > max_runs_per_final_merge), - // one or more intermediate merges are required. - // TODO: Attempt to allocate more memory before doing intermediate merges. This may - // be possible if other operators have relinquished memory after the sort has built - // its runs. - if (min_buffers_for_merge > block_mgr_->available_allocated_buffers()) { - DCHECK(unpinned_final); - RETURN_IF_ERROR(MergeIntermediateRuns()); - } + // At least one merge is necessary. + int blocks_per_run = has_var_len_slots_ ? 2 : 1; + int min_buffers_for_merge = sorted_runs_.size() * blocks_per_run; + // Check if the final run needs to be unpinned. + bool unpinned_final = false; + if (block_mgr_->num_free_buffers() < min_buffers_for_merge - blocks_per_run) { + // Number of available buffers is less than the size of the final run and + // the buffers needed to read the remainder of the runs in memory. + // Unpin the final run. + RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllBlocks()); + unpinned_final = true; + } else { + // No need to unpin the current run. There is enough memory to stream the + // other runs. + // TODO: revisit. It might be better to unpin some from this run if it means + // we can get double buffering in the other runs. + } - // Create the final merger. - RETURN_IF_ERROR(CreateMerger(sorted_runs_.size())); + // For an intermediate merge, intermediate_merge_batch contains deep-copied rows from + // the input runs. If (unmerged_sorted_runs_.size() > max_runs_per_final_merge), + // one or more intermediate merges are required. + // TODO: Attempt to allocate more memory before doing intermediate merges. This may + // be possible if other operators have relinquished memory after the sort has built + // its runs. + if (min_buffers_for_merge > block_mgr_->available_allocated_buffers()) { + DCHECK(unpinned_final); + RETURN_IF_ERROR(MergeIntermediateRuns()); } + + // Create the final merger. + RETURN_IF_ERROR(CreateMerger(sorted_runs_.size())); return Status::OK(); } Status Sorter::GetNext(RowBatch* output_batch, bool* eos) { if (sorted_runs_.size() == 1) { - DCHECK(sorted_runs_.back()->is_pinned_); - // In this case, only TupleRows are copied into output_batch. Sorted tuples are left - // in the pinned blocks in the single sorted run. - RETURN_IF_ERROR(sorted_runs_.back()->GetNext<false>(output_batch, eos)); - if (*eos) sorted_runs_.back()->TransferResources(output_batch); + DCHECK(sorted_runs_.back()->is_pinned()); + return sorted_runs_.back()->GetNext<false>(output_batch, eos); } else { - // In this case, rows are deep copied into output_batch. - RETURN_IF_ERROR(merger_->GetNext(output_batch, eos)); + return merger_->GetNext(output_batch, eos); } - return Status::OK(); } Status Sorter::Reset() { + DCHECK(unsorted_run_ == NULL); merger_.reset(); - merging_runs_.clear(); - sorted_runs_.clear(); + // Free resources from the current runs. + CleanupAllRuns(); obj_pool_.Clear(); - DCHECK(unsorted_run_ == NULL); unsorted_run_ = obj_pool_.Add( new Run(this, output_row_desc_->tuple_descriptors()[0], true)); RETURN_IF_ERROR(unsorted_run_->Init()); return Status::OK(); } -Status Sorter::SortRun() { - BufferedBlockMgr::Block* last_block = unsorted_run_->fixed_len_blocks_.back(); - if (last_block->valid_data_len() > 0) { - sorted_data_size_->Add(last_block->valid_data_len()); - } else { - last_block->Delete(); - unsorted_run_->fixed_len_blocks_.pop_back(); - } - if (has_var_len_slots_) { - DCHECK(unsorted_run_->var_len_copy_block_ != NULL); - last_block = unsorted_run_->var_len_blocks_.back(); - if (last_block->valid_data_len() > 0) { - sorted_data_size_->Add(last_block->valid_data_len()); - } else { - last_block->Delete(); - unsorted_run_->var_len_blocks_.pop_back(); - if (unsorted_run_->var_len_blocks_.size() == 0) { - unsorted_run_->var_len_copy_block_->Delete(); - unsorted_run_->var_len_copy_block_ = NULL; - } - } - } +void Sorter::Close() { + CleanupAllRuns(); + block_mgr_->ClearReservations(block_mgr_client_); + obj_pool_.Clear(); +} + +void Sorter::CleanupAllRuns() { + Run::CleanupRuns(&sorted_runs_); + Run::CleanupRuns(&merging_runs_); + if (unsorted_run_ != NULL) unsorted_run_->DeleteAllBlocks(); + unsorted_run_ = NULL; +} + +Status Sorter::SortCurrentInputRun() { + RETURN_IF_ERROR(unsorted_run_->FinalizeInput()); + { SCOPED_TIMER(in_mem_sort_timer_); RETURN_IF_ERROR(in_mem_tuple_sorter_->Sort(unsorted_run_)); } sorted_runs_.push_back(unsorted_run_); + sorted_data_size_->Add(unsorted_run_->TotalBytes()); unsorted_run_ = NULL; - return Status::OK(); -} -uint64_t Sorter::EstimateMergeMem(uint64_t available_blocks, - RowDescriptor* row_desc, int merge_batch_size) { - bool has_var_len_slots = row_desc->tuple_descriptors()[0]->HasVarlenSlots(); - int blocks_per_run = has_var_len_slots ? 2 : 1; - int max_input_runs_per_merge = (available_blocks / blocks_per_run) - 1; - // During a merge, the batches corresponding to the input runs contain only TupleRows. - // (The data itself is in pinned blocks held by the run) - uint64_t input_batch_mem = - merge_batch_size * sizeof(Tuple*) * max_input_runs_per_merge; - // Since rows are deep copied into the output batch for the merger, use a pessimistic - // estimate of the memory required. - uint64_t output_batch_mem = RowBatch::AT_CAPACITY_MEM_USAGE; - - return input_batch_mem + output_batch_mem; + RETURN_IF_CANCELLED(state_); + return Status::OK(); } Status Sorter::MergeIntermediateRuns() { @@ -1294,16 +1498,19 @@ Status Sorter::MergeIntermediateRuns() { scoped_ptr<RowBatch> intermediate_merge_batch; while (sorted_runs_.size() > max_runs_per_final_merge) { // An intermediate merge adds one merge to unmerged_sorted_runs_. - // Merging 'runs - (max_runs_final_ - 1)' number of runs is sifficient to guarantee + // Merging 'runs - (max_runs_final_ - 1)' number of runs is sufficient to guarantee // that the final merge can be performed. int num_runs_to_merge = min<int>(max_runs_per_intermediate_merge, sorted_runs_.size() - max_runs_per_intermediate_merge); RETURN_IF_ERROR(CreateMerger(num_runs_to_merge)); RowBatch intermediate_merge_batch(*output_row_desc_, state_->batch_size(), mem_tracker_); - // merged_run is the new sorted run that is produced by the intermediate merge. + // 'merged_run' is the new sorted run that is produced by the intermediate merge. + // We added 'merged_run' to 'sorted_runs_' immediately so that it is cleaned up + // in Close(). Run* merged_run = obj_pool_.Add( new Run(this, output_row_desc_->tuple_descriptors()[0], false)); + sorted_runs_.push_back(merged_run); RETURN_IF_ERROR(merged_run->Init()); bool eos = false; while (!eos) { @@ -1311,38 +1518,14 @@ Status Sorter::MergeIntermediateRuns() { int num_copied; RETURN_IF_CANCELLED(state_); RETURN_IF_ERROR(merger_->GetNext(&intermediate_merge_batch, &eos)); - Status ret_status; - if (has_var_len_slots_) { - ret_status = merged_run->AddBatch<true>(&intermediate_merge_batch, - 0, &num_copied); - } else { - ret_status = merged_run->AddBatch<false>(&intermediate_merge_batch, - 0, &num_copied); - } - if (!ret_status.ok()) return ret_status; + RETURN_IF_ERROR( + merged_run->AddIntermediateBatch(&intermediate_merge_batch, 0, &num_copied)); DCHECK_EQ(num_copied, intermediate_merge_batch.num_rows()); intermediate_merge_batch.Reset(); } - BufferedBlockMgr::Block* last_block = merged_run->fixed_len_blocks_.back(); - if (last_block->valid_data_len() > 0) { - RETURN_IF_ERROR(last_block->Unpin()); - } else { - last_block->Delete(); - merged_run->fixed_len_blocks_.pop_back(); - } - if (has_var_len_slots_) { - last_block = merged_run->var_len_blocks_.back(); - if (last_block->valid_data_len() > 0) { - RETURN_IF_ERROR(last_block->Unpin()); - } else { - last_block->Delete(); - merged_run->var_len_blocks_.pop_back(); - } - } - merged_run->is_pinned_ = false; - sorted_runs_.push_back(merged_run); + RETURN_IF_ERROR(merged_run->FinalizeInput()); } return Status::OK(); @@ -1350,13 +1533,12 @@ Status Sorter::MergeIntermediateRuns() { Status Sorter::CreateMerger(int num_runs) { DCHECK_GT(num_runs, 1); - // Clean up the runs from the previous merge. - for (deque<Run*>::iterator it = merging_runs_.begin(); - it != merging_runs_.end(); ++it) { - (*it)->DeleteAllBlocks(); - } - merging_runs_.clear(); + Run::CleanupRuns(&merging_runs_); + + // TODO: 'deep_copy_input' is set to true, which forces the merger to copy all rows + // from the runs being merged. This is unnecessary overhead that is not required if we + // correctly transfer resources. merger_.reset( new SortedRunMerger(compare_less_than_, output_row_desc_, profile_, true));
