IMPALA-3332: Free local allocations in sorter. Sorter can have runaway memory consumption as it never frees local allocations made in comparator_.Less(). In addition, it doesn't check for errors generated during expression evaluation so it may keep sorting even after failures have occurred.
This change fixes the problem by freeing local allocations for every n invocations of comparator_.Less() where n is the row batch size specified in the query options. Various error checks are also added to return early if any error is encountered. Change-Id: I941729b4836e5dbb827d4313a0b45bc5df2fa8e1 Reviewed-on: http://gerrit.cloudera.org:8080/3116 Reviewed-by: Michael Ho <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f7501d2e Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f7501d2e Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f7501d2e Branch: refs/heads/master Commit: f7501d2ec18feb2fd8e12cd7de3b9b9726085b54 Parents: 38416ee Author: Michael Ho <[email protected]> Authored: Wed May 18 10:56:52 2016 -0700 Committer: Tim Armstrong <[email protected]> Committed: Mon May 23 08:40:18 2016 -0700 ---------------------------------------------------------------------- be/src/runtime/sorted-run-merger.cc | 2 + be/src/runtime/sorter.cc | 117 ++++++++++++------- be/src/util/tuple-row-compare.h | 16 ++- .../queries/QueryTest/spilling.test | 8 +- 4 files changed, 91 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f7501d2e/be/src/runtime/sorted-run-merger.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorted-run-merger.cc b/be/src/runtime/sorted-run-merger.cc index ea7a689..0376437 100644 --- a/be/src/runtime/sorted-run-merger.cc +++ b/be/src/runtime/sorted-run-merger.cc @@ -180,6 +180,8 @@ Status SortedRunMerger::GetNext(RowBatch* output_batch, bool* eos) { Heapify(0); } + // Free local allocations made by comparator_.Less(); + comparator_.FreeLocalAllocations(); *eos = min_heap_.empty(); return Status::OK(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f7501d2e/be/src/runtime/sorter.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc index 96c27df..e0d388a 100644 --- a/be/src/runtime/sorter.cc +++ b/be/src/runtime/sorter.cc @@ -227,10 +227,9 @@ class Sorter::TupleSorter { ~TupleSorter(); /// Performs a quicksort for tuples in 'run' followed by an insertion sort to - /// finish smaller blocks. - /// Returns early if stste_->is_cancelled() is true. No status - /// is returned - the caller must check for cancellation. - void Sort(Run* run); + /// finish smaller blocks. Returns an error status if any error is encountered or + /// if the query is cancelled. + Status Sort(Run* run); private: static const int INSERTION_THRESHOLD = 16; @@ -266,6 +265,12 @@ class Sorter::TupleSorter { current_tuple_ = buffer_start_ + block_offset + past_end_bytes; } + /// Default constructor used for local variable. + TupleIterator() + : parent_(NULL), + index_(-1), + current_tuple_(NULL) { } + /// Sets 'current_tuple_' to point to the next tuple in the run. Increments 'block_index_' /// and advances to the next block if the next tuple is in the next block. /// Can be advanced one past the last tuple in the run, but is not valid to @@ -327,6 +332,10 @@ class Sorter::TupleSorter { /// Tuple comparator with method Less() that returns true if lhs < rhs. const TupleRowComparator comparator_; + /// Number of times comparator_.Less() can be invoked again before + /// comparator_.FreeLocalAllocations() needs to be called. + int num_comparisons_till_free_; + /// Runtime state instance to check for cancellation. Not owned. RuntimeState* const state_; @@ -344,20 +353,27 @@ class Sorter::TupleSorter { /// high: Mersenne Twister should be more than adequate. mt19937_64 rng_; - /// Perform an insertion sort for rows in the range [first, last) in a run. - void InsertionSort(const TupleIterator& first, const TupleIterator& last); + /// Wrapper around comparator_.Less(). Also call comparator_.FreeLocalAllocations() + /// on every 'state_->batch_size()' invocations of comparator_.Less(). Returns true + /// if 'lhs' is less than 'rhs'. + bool Less(TupleRow* lhs, TupleRow* rhs); + + /// Performs an insertion sort for rows in the range [first, last) in a run. + /// Returns an error status if there is any error or if the query is cancelled. + Status InsertionSort(const TupleIterator& first, const TupleIterator& last); /// Partitions the sequence of tuples in the range [first, last) in a run into two /// groups around the pivot tuple - i.e. tuples in first group are <= the pivot, and /// tuples in the second group are >= pivot. Tuples are swapped in place to create the - /// groups and the index to the first element in the second group is returned. - /// Checks state_->is_cancelled() and returns early with an invalid result if true. - TupleIterator Partition(TupleIterator first, TupleIterator last, Tuple* pivot); + /// groups and the index to the first element in the second group is returned in 'cut'. + /// Return an error status if any error is encountered or if the query is cancelled. + Status Partition(TupleIterator first, TupleIterator last, Tuple* pivot, + TupleIterator* cut); /// Performs a quicksort of rows in the range [first, last) followed by insertion sort - /// for smaller groups of elements. - /// Checks state_->is_cancelled() and returns early if true. - void SortHelper(TupleIterator first, TupleIterator last); + /// for smaller groups of elements. Return an error status for any errors or if the + /// query is cancelled. + Status SortHelper(TupleIterator first, TupleIterator last); /// Select a pivot to partition [first, last). Tuple* SelectPivot(TupleIterator first, TupleIterator last); @@ -582,7 +598,7 @@ Status Sorter::Run::UnpinAllBlocks() { cur_sorted_var_len_block = sorted_var_len_blocks.back(); } uint8_t* var_data_ptr = - cur_sorted_var_len_block->Allocate<uint8_t>(total_var_len); + cur_sorted_var_len_block->Allocate<uint8_t>(total_var_len); DCHECK_EQ(sorted_var_len_blocks.back(), cur_sorted_var_len_block); CopyVarLenDataConvertOffset(string_values, sorted_var_len_blocks.size() - 1, reinterpret_cast<uint8_t*>(cur_sorted_var_len_block->buffer()), var_data_ptr); @@ -863,6 +879,7 @@ Sorter::TupleSorter::TupleSorter(const TupleRowComparator& comp, int64_t block_s block_capacity_(block_size / tuple_size), last_tuple_block_offset_(tuple_size * ((block_size / tuple_size) - 1)), comparator_(comp), + num_comparisons_till_free_(state->batch_size()), state_(state) { temp_tuple_buffer_ = new uint8_t[tuple_size]; temp_tuple_row_ = reinterpret_cast<TupleRow*>(&temp_tuple_buffer_); @@ -874,10 +891,22 @@ Sorter::TupleSorter::~TupleSorter() { delete[] swap_buffer_; } -void Sorter::TupleSorter::Sort(Run* run) { +Status Sorter::TupleSorter::Sort(Run* run) { run_ = run; - SortHelper(TupleIterator(this, 0), TupleIterator(this, run_->num_tuples_)); + RETURN_IF_ERROR( + SortHelper(TupleIterator(this, 0), TupleIterator(this, run_->num_tuples_))); run->is_sorted_ = true; + return Status::OK(); +} + +bool Sorter::TupleSorter::Less(TupleRow* lhs, TupleRow* rhs) { + --num_comparisons_till_free_; + DCHECK_GE(num_comparisons_till_free_, 0); + if (UNLIKELY(num_comparisons_till_free_ == 0)) { + comparator_.FreeLocalAllocations(); + num_comparisons_till_free_ = state_->batch_size(); + } + return comparator_.Less(lhs, rhs); } // Sort the sequence of tuples from [first, last). @@ -886,7 +915,7 @@ void Sorter::TupleSorter::Sort(Run* run) { // the sorted sequence by comparing it to each element of the sorted sequence // (reverse order) to find its correct place in the sorted sequence, copying tuples // along the way. -void Sorter::TupleSorter::InsertionSort(const TupleIterator& first, +Status Sorter::TupleSorter::InsertionSort(const TupleIterator& first, const TupleIterator& last) { TupleIterator insert_iter = first; insert_iter.Next(); @@ -902,8 +931,7 @@ void Sorter::TupleSorter::InsertionSort(const TupleIterator& first, TupleIterator iter = insert_iter; iter.Prev(); uint8_t* copy_to = insert_iter.current_tuple_; - while (comparator_.Less( - temp_tuple_row_, reinterpret_cast<TupleRow*>(&iter.current_tuple_))) { + while (Less(temp_tuple_row_, reinterpret_cast<TupleRow*>(&iter.current_tuple_))) { memcpy(copy_to, iter.current_tuple_, tuple_size_); copy_to = iter.current_tuple_; // Break if 'iter' has reached the first row, meaning that temp_tuple_row_ @@ -914,22 +942,23 @@ void Sorter::TupleSorter::InsertionSort(const TupleIterator& first, memcpy(copy_to, temp_tuple_buffer_, tuple_size_); } + RETURN_IF_CANCELLED(state_); + RETURN_IF_ERROR(state_->GetQueryStatus()); + return Status::OK(); } -Sorter::TupleSorter::TupleIterator Sorter::TupleSorter::Partition(TupleIterator first, - TupleIterator last, Tuple* pivot) { +Status Sorter::TupleSorter::Partition(TupleIterator first, TupleIterator last, + Tuple* pivot, Sorter::TupleSorter::TupleIterator* cut) { // Copy pivot into temp_tuple since it points to a tuple within [first, last). memcpy(temp_tuple_buffer_, pivot, tuple_size_); last.Prev(); while (true) { // Search for the first and last out-of-place elements, and swap them. - while (comparator_.Less( - reinterpret_cast<TupleRow*>(&first.current_tuple_), temp_tuple_row_)) { + while (Less(reinterpret_cast<TupleRow*>(&first.current_tuple_), temp_tuple_row_)) { first.Next(); } - while (comparator_.Less( - temp_tuple_row_, reinterpret_cast<TupleRow*>(&last.current_tuple_))) { + while (Less(temp_tuple_row_, reinterpret_cast<TupleRow*>(&last.current_tuple_))) { last.Prev(); } @@ -939,38 +968,43 @@ Sorter::TupleSorter::TupleIterator Sorter::TupleSorter::Partition(TupleIterator first.Next(); last.Prev(); + + RETURN_IF_CANCELLED(state_); + RETURN_IF_ERROR(state_->GetQueryStatus()); } - return first; + *cut = first; + return Status::OK(); } -void Sorter::TupleSorter::SortHelper(TupleIterator first, TupleIterator last) { - if (UNLIKELY(state_->is_cancelled())) return; +Status Sorter::TupleSorter::SortHelper(TupleIterator first, TupleIterator last) { // Use insertion sort for smaller sequences. while (last.index_ - first.index_ > INSERTION_THRESHOLD) { + // Select a pivot and call Partition() to split the tuples in [first, last) into two + // groups (<= pivot and >= pivot) in-place. 'cut' is the index of the first tuple in + // the second group. Tuple* pivot = SelectPivot(first, last); - - // Partition() splits the tuples in [first, last) into two groups (<= pivot - // and >= pivot) in-place. 'cut' is the index of the first tuple in the second group. - TupleIterator cut = Partition(first, last, pivot); + TupleIterator cut; + RETURN_IF_ERROR(Partition(first, last, pivot, &cut)); // Recurse on the smaller partition. This limits stack size to log(n) stack frames. if (cut.index_ - first.index_ < last.index_ - cut.index_) { // Left partition is smaller. - SortHelper(first, cut); + RETURN_IF_ERROR(SortHelper(first, cut)); first = cut; } else { // Right partition is equal or smaller. - SortHelper(cut, last); + RETURN_IF_ERROR(SortHelper(cut, last)); last = cut; } - SortHelper(cut, last); + RETURN_IF_ERROR(SortHelper(cut, last)); last = cut; - if (UNLIKELY(state_->is_cancelled())) return; - } - InsertionSort(first, last); + RETURN_IF_CANCELLED(state_); + } + RETURN_IF_ERROR(InsertionSort(first, last)); + return Status::OK(); } Tuple* Sorter::TupleSorter::SelectPivot(TupleIterator first, TupleIterator last) { @@ -1002,9 +1036,9 @@ Tuple* Sorter::TupleSorter::MedianOfThree(Tuple* t1, Tuple* t2, Tuple* t3) { TupleRow* tr2 = reinterpret_cast<TupleRow*>(&t2); TupleRow* tr3 = reinterpret_cast<TupleRow*>(&t3); - bool t1_lt_t2 = comparator_.Less(tr1, tr2); - bool t2_lt_t3 = comparator_.Less(tr2, tr3); - bool t1_lt_t3 = comparator_.Less(tr1, tr3); + bool t1_lt_t2 = Less(tr1, tr2); + bool t2_lt_t3 = Less(tr2, tr3); + bool t1_lt_t3 = Less(tr1, tr3); if (t1_lt_t2) { // t1 < t2 @@ -1222,8 +1256,7 @@ Status Sorter::SortRun() { } { SCOPED_TIMER(in_mem_sort_timer_); - in_mem_tuple_sorter_->Sort(unsorted_run_); - RETURN_IF_CANCELLED(state_); + RETURN_IF_ERROR(in_mem_tuple_sorter_->Sort(unsorted_run_)); } sorted_runs_.push_back(unsorted_run_); unsorted_run_ = NULL; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f7501d2e/be/src/util/tuple-row-compare.h ---------------------------------------------------------------------- diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h index 41035df..8505f19 100644 --- a/be/src/util/tuple-row-compare.h +++ b/be/src/util/tuple-row-compare.h @@ -102,7 +102,7 @@ class TupleRowComparator { if (lhs_value != NULL && rhs_value == NULL) return -nulls_first_[i]; int result = RawValue::Compare(lhs_value, rhs_value, - key_expr_ctxs_lhs_[i]->root()->type()); + key_expr_ctxs_lhs_[i]->root()->type()); if (!is_asc_[i]) result = -result; if (result != 0) return result; // Otherwise, try the next Expr @@ -126,7 +126,17 @@ class TupleRowComparator { return Less(lhs_row, rhs_row); } + /// Free any local allocations made during expression evaluations in Compare(). + void FreeLocalAllocations() const { + ExprContext::FreeLocalAllocations(key_expr_ctxs_lhs_); + ExprContext::FreeLocalAllocations(key_expr_ctxs_rhs_); + } + private: + /// Codegen Compare(). Returns a non-OK status if codegen is unsuccessful. + /// TODO: have codegen'd users inline this instead of calling through the () operator + Status CodegenCompare(RuntimeState* state, llvm::Function** fn); + const std::vector<ExprContext*>& key_expr_ctxs_lhs_; const std::vector<ExprContext*>& key_expr_ctxs_rhs_; std::vector<bool> is_asc_; @@ -142,10 +152,6 @@ class TupleRowComparator { typedef int (*CompareFn)(ExprContext* const*, ExprContext* const*, TupleRow*, TupleRow*); CompareFn* codegend_compare_fn_; - - /// Codegen Compare(). Returns a non-OK status if codegen is unsuccessful. - /// TODO: have codegen'd users inline this instead of calling through the () operator - Status CodegenCompare(RuntimeState* state, llvm::Function** fn); }; /// Compares the equality of two Tuples, going slot by slot. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f7501d2e/testdata/workloads/functional-query/queries/QueryTest/spilling.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test index a29c6c7..c35ddec 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test +++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test @@ -393,11 +393,10 @@ row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\) ---- QUERY # Test sort with inlined char column materialized by exprs. # Set low memory limit to force spilling. +# IMPALA-3332: comparator makes local allocations that cause runaway memory consumption. set num_nodes=0; set max_block_mgr_memory=4m; -# IMPALA-3332: comparator makes local allocations that cause runaway memory consumption. -# When IMPALA-3332 is fixed, can reenable this memory limit. -#set mem_limit=200m; +set mem_limit=200m; set disable_outermost_topn=1; select cast(l_comment as char(50)) from lineitem @@ -513,8 +512,7 @@ row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\) set num_nodes=0; set max_block_mgr_memory=4m; # IMPALA-3332: comparator makes local allocations that cause runaway memory consumption. -# When IMPALA-3332 is fixed, can reenable this memory limit. -#set mem_limit=200m; +set mem_limit=200m; set disable_outermost_topn=1; select cast(l_comment as varchar(50)) from lineitem
