IMPALA-5706: Spilling sort optimisations This patch covers multiple changes with the purpose of optimizing spilling sort mechanism: - Remove the hard-coded maximum limit of buffers that can be used for merging the sorted runs. Instead this number is calculated based on the available memory through buffer pool. - The already sorted runs are distributed more optimally between the last intermediate merge and the final merge to avoid that a heavy intermediate merge is followed by a light final merge. - Right before starting the merging phase Sorter tries to allocate additional memory through the buffer pool. - An output run is not allocated anymore for the final merge.
Note, double-buffering the runs during a merge was also planned with this patch. However, performance testing showed that except some exotic queries with unreasonably small amount of buffer pool memory available double-buffering doesn't add to the overall performance. It's basically because the half of the available buffers have to be sacrificed to do double-buffering and as a result the merge tree can get deeper. In addition the amount of I/O wait time is not reaching the level where double-buffering could countervail the reduced number of runs during a particular merge. Performance measurements were made during manual testing to verify that this is in fact an optimization: - In case doing a sort on top of a join when working with a restricted amount of memory then the Sort node successfully allocates additional memory right before the merging phase. This is feasible because once Join finishes sending new input data and calls InputDone() then it releases memory that can be picked up by the Sorter. This results in shallower merging trees (more runs grabbed for a merge). - On a multi-node cluster I verified that in cases when at least one merging step is done then this change reduces the execution time for sorts. - The more merging steps are done the bigger the performance gain is compared to the baseline. Change-Id: I74857c1694802e81f1cfc765d2b4e8bc644387f9 Reviewed-on: http://gerrit.cloudera.org:8080/9943 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ab7ac5b6 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ab7ac5b6 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ab7ac5b6 Branch: refs/heads/2.x Commit: ab7ac5b6108646f98a9dcfcfb3a17f5ab5861586 Parents: 873053f Author: Gabor Kaszab <gaborkas...@cloudera.com> Authored: Fri Apr 6 17:04:06 2018 +0200 Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org> Committed: Wed Jun 13 03:10:18 2018 +0000 ---------------------------------------------------------------------- be/src/runtime/sorter.cc | 192 +++++++++++++++++++------------------ be/src/runtime/sorter.h | 45 ++++++--- tests/query_test/test_sort.py | 32 ++++--- 3 files changed, 149 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/ab7ac5b6/be/src/runtime/sorter.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc index ead4065..a8bab16 100644 --- a/be/src/runtime/sorter.cc +++ b/be/src/runtime/sorter.cc @@ -44,18 +44,6 @@ namespace impala { // Number of pinned pages required for a merge with fixed-length data only. const int MIN_BUFFERS_PER_MERGE = 3; -// Maximum number of buffers to use in each merge to prevent sorter trying to grab -// all the memory when spilling. Given 8mb buffers, this limits the sorter to using -// 1GB of buffers when merging. -// TODO: this is an arbitrary limit. Once we have reliable reservations (IMPALA-3200) -// we should base this on the number of reservations. -const int MAX_BUFFERS_PER_MERGE = 128; - -const string MERGE_FAILED_ERROR_MSG = "Failed to allocate page to merge spilled runs " - "during sorting. Only $0 runs could be merged, but must be able to merge at least 2 " - "to make progress. Reducing query concurrency or increasing the memory limit may " - "help this query to complete successfully."; - /// Wrapper around BufferPool::PageHandle that tracks additional info about the page. /// The Page can be in four states: /// * Closed: The page starts in this state before Init() is called. Calling @@ -259,11 +247,8 @@ class Sorter::Run { /// Prepare to read a sorted run. Pins the first page(s) in the run if the run was /// previously unpinned. If the run was unpinned, try to pin the initial fixed and - /// var len pages in the run. If it couldn't pin them, set pinned to false. - /// In that case, none of the initial pages will be pinned and it is valid to - /// call PrepareRead() again to retry pinning. pinned is always set to - /// true if the run was pinned. - Status PrepareRead(bool* pinned) WARN_UNUSED_RESULT; + /// var len pages in the run. If it couldn't pin them, an error Status is returned. + Status PrepareRead() WARN_UNUSED_RESULT; /// Interface for merger - get the next batch of rows from this run. This run still /// owns the returned batch. Calls GetNext(RowBatch*, bool*). @@ -301,6 +286,13 @@ class Sorter::Run { inline void set_sorted() { is_sorted_ = true; } inline int64_t num_tuples() const { return num_tuples_; } + /// Returns true if we have var-len pages in the run. + inline bool HasVarLenPages() const { + // Shouldn't have any pages unless there are slots. + DCHECK(var_len_pages_.empty() || has_var_len_slots_); + return !var_len_pages_.empty(); + } + private: /// TupleIterator needs access to internals to iterate over tuples. friend class TupleIterator; @@ -362,13 +354,6 @@ class Sorter::Run { /// data is in the next page, in which case 'tuple' is unmodified. bool ConvertOffsetsToPtrs(Tuple* tuple); - /// Returns true if we have var-len pages in the run. - inline bool HasVarLenPages() const { - // Shouldn't have any pages unless there are slots. - DCHECK(var_len_pages_.empty() || has_var_len_slots_); - return !var_len_pages_.empty(); - } - static int NumOpenPages(const vector<Page>& pages) { int count = 0; for (const Page& page : pages) { @@ -872,7 +857,7 @@ cleanup_pages: return status; } -Status Sorter::Run::PrepareRead(bool* pinned) { +Status Sorter::Run::PrepareRead() { DCHECK(is_finalized_); DCHECK(is_sorted_); @@ -886,26 +871,18 @@ Status Sorter::Run::PrepareRead(bool* pinned) { sorter_->output_row_desc_, sorter_->state_->batch_size(), sorter_->mem_tracker_)); // If the run is pinned, all pages are already pinned, so we're ready to read. - if (is_pinned_) { - *pinned = true; - return Status::OK(); - } - - int num_to_pin = (fixed_len_pages_.size() > 0 ? 1 : 0) + (HasVarLenPages() ? 1 : 0); - int64_t required_mem = num_to_pin * sorter_->page_len_; - if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(required_mem)) { - *pinned = false; - return Status::OK(); - } + if (is_pinned_) return Status::OK(); - // Attempt to pin the first fixed and var-length pages. + // Pins the first fixed len page. if (fixed_len_pages_.size() > 0) { RETURN_IF_ERROR(fixed_len_pages_[0].Pin(sorter_->buffer_pool_client_)); } + + // Pins the first var len page if there is any. if (HasVarLenPages()) { RETURN_IF_ERROR(var_len_pages_[0].Pin(sorter_->buffer_pool_client_)); } - *pinned = true; + return Status::OK(); } @@ -1555,8 +1532,8 @@ Status Sorter::Open() { return Status::OK(); } -int64_t Sorter::ComputeMinReservation() { - // Must be kept in sync with SortNode.computeNodeResourceProfile() in fe. +// Must be kept in sync with SortNode.computeNodeResourceProfile() in fe. +int64_t Sorter::ComputeMinReservation() const { int min_buffers_required = enable_spilling_ ? MIN_BUFFERS_PER_MERGE : 1; // Fixed and var-length pages are separate, so we need double the pages // if there is var-length data. @@ -1605,9 +1582,7 @@ Status Sorter::InputDone() { // The entire input fit in one run. Read sorted rows in GetNext() directly from the // in-memory sorted run. DCHECK(sorted_runs_.back()->is_pinned()); - bool success; - RETURN_IF_ERROR(sorted_runs_.back()->PrepareRead(&success)); - DCHECK(success) << "Should always be able to prepare pinned run for read."; + RETURN_IF_ERROR(sorted_runs_.back()->PrepareRead()); return Status::OK(); } DCHECK(enable_spilling_); @@ -1618,9 +1593,6 @@ Status Sorter::InputDone() { RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages()); // Merge intermediate runs until we have a final merge set-up. - // TODO: Attempt to allocate more memory before doing intermediate merges. This may - // be possible if other operators have relinquished memory after the sort has built - // its runs. This depends on more reliable reservations (IMPALA-3200) return MergeIntermediateRuns(); } @@ -1679,43 +1651,93 @@ Status Sorter::SortCurrentInputRun() { return Status::OK(); } +int Sorter::MaxRunsInNextMerge() const { + int num_available_buffers = buffer_pool_client_->GetUnusedReservation() / page_len_; + DCHECK_GE(num_available_buffers, ComputeMinReservation() / page_len_); + int num_runs_in_one_merge = 0; + int num_required_buffers = 0; + + for (int i = 0; i < sorted_runs_.size(); ++i) { + int num_buffers_for_this_run = (sorted_runs_[i]->HasVarLenPages()) ? 2 : 1; + + if (num_required_buffers + num_buffers_for_this_run <= num_available_buffers) { + num_required_buffers += num_buffers_for_this_run; + ++num_runs_in_one_merge; + } else { + // Not enough buffers to merge all the runs in one final merge. Intermediate merge + // is required. + // Increasing the required buffers count to include the result run of the + // intermediate merge. + num_required_buffers += + (output_row_desc_->tuple_descriptors()[0]->HasVarlenSlots()) ? 2 : 1; + // Have to reduce the number of runs for this merge to fit in the available buffer + // pool memory. + for (int j = i - 1; j >= 0; --j) { + num_required_buffers -= sorted_runs_[j]->HasVarLenPages() ? 2 : 1; + --num_runs_in_one_merge; + if (num_required_buffers <= num_available_buffers) break; + } + DCHECK_LE(num_required_buffers, num_available_buffers); + break; + } + } + + DCHECK_GT(num_runs_in_one_merge, 1); + return num_runs_in_one_merge; +} + +void Sorter::TryToIncreaseMemAllocationForMerge() { + int pages_needed_for_full_merge = 0; + for (auto run : sorted_runs_) { + pages_needed_for_full_merge += (run->HasVarLenPages()) ? 2 : 1; + } + int available_pages = buffer_pool_client_->GetUnusedReservation() / page_len_; + + // Start allocating more pages than available now. Stop once no more memory can be + // allocated. + for (int i = 0; i < pages_needed_for_full_merge - available_pages; ++i) { + if (!buffer_pool_client_->IncreaseReservation(page_len_)) return; + } +} + +int Sorter::GetNumOfRunsForMerge() const { + int max_runs_in_next_merge = MaxRunsInNextMerge(); + + // Check if all the runs fit in a final merge. Won't need an extra run for the output + // compared to an intermediate run. + if (max_runs_in_next_merge == sorted_runs_.size()) { + return max_runs_in_next_merge; + } + + // If this is the last intermediate merge before the final merge then distributes the + // runs between the intermediate and the final merge to saturate the final merge with + // as many runs as possible reducing the number of merging on the same rows. + if (max_runs_in_next_merge * 2 >= sorted_runs_.size()) { + return sorted_runs_.size() - max_runs_in_next_merge; + } + return max_runs_in_next_merge; +} + Status Sorter::MergeIntermediateRuns() { DCHECK_GE(sorted_runs_.size(), 2); - int pinned_pages_per_run = has_var_len_slots_ ? 2 : 1; - int max_runs_per_final_merge = MAX_BUFFERS_PER_MERGE / pinned_pages_per_run; - // During an intermediate merge, the one or two pages from the output sorted run - // that are being written must be pinned. - int max_runs_per_intermediate_merge = max_runs_per_final_merge - 1; - DCHECK_GT(max_runs_per_intermediate_merge, 1); + // Attempt to allocate more memory before doing intermediate merges. This may + // be possible if other operators have relinquished memory after the sort has built + // its runs. + TryToIncreaseMemAllocationForMerge(); while (true) { - // An intermediate merge adds one merge to unmerged_sorted_runs_. - // TODO: once we have reliable reservations (IMPALA-3200), we should calculate this - // based on the available reservations. - int num_runs_to_merge = - min<int>(max_runs_per_intermediate_merge, sorted_runs_.size()); + int num_of_runs_to_merge = GetNumOfRunsForMerge(); DCHECK(merge_output_run_ == NULL) << "Should have finished previous merge."; - // Create the merged run in case we need to do intermediate merges. We need the - // output run and at least two input runs in memory to make progress on the - // intermediate merges. - // TODO: this isn't optimal: we could defer creating the merged run if we have - // reliable reservations (IMPALA-3200). + RETURN_IF_ERROR(CreateMerger(num_of_runs_to_merge)); + + // If CreateMerger() consumed all the sorted runs, we have set up the final merge. + if (sorted_runs_.empty()) return Status::OK(); + merge_output_run_ = run_pool_.Add( new Run(this, output_row_desc_->tuple_descriptors()[0], false)); RETURN_IF_ERROR(merge_output_run_->Init()); - RETURN_IF_ERROR(CreateMerger(num_runs_to_merge)); - - // If CreateMerger() consumed all the sorted runs, we have set up the final merge. - if (sorted_runs_.empty()) { - // Don't need intermediate run for final merge. - if (merge_output_run_ != NULL) { - merge_output_run_->CloseAllPages(); - merge_output_run_ = NULL; - } - return Status::OK(); - } RETURN_IF_ERROR(ExecuteIntermediateMerge(merge_output_run_)); sorted_runs_.push_back(merge_output_run_); merge_output_run_ = NULL; @@ -1723,8 +1745,8 @@ Status Sorter::MergeIntermediateRuns() { return Status::OK(); } -Status Sorter::CreateMerger(int max_num_runs) { - DCHECK_GE(max_num_runs, 2); +Status Sorter::CreateMerger(int num_runs) { + DCHECK_GE(num_runs, 2); DCHECK_GE(sorted_runs_.size(), 2); // Clean up the runs from the previous merge. Run::CleanupRuns(&merging_runs_); @@ -1736,23 +1758,11 @@ Status Sorter::CreateMerger(int max_num_runs) { new SortedRunMerger(compare_less_than_, output_row_desc_, profile_, true)); vector<function<Status (RowBatch**)>> merge_runs; - merge_runs.reserve(max_num_runs); - for (int i = 0; i < max_num_runs; ++i) { + merge_runs.reserve(num_runs); + for (int i = 0; i < num_runs; ++i) { Run* run = sorted_runs_.front(); - bool success; - RETURN_IF_ERROR(run->PrepareRead(&success)); - if (!success) { - // If we can merge at least two runs, we can continue, otherwise we have a problem - // because we can't make progress on the merging. - // TODO: IMPALA-3200: we should not need this logic once we have reliable - // reservations (IMPALA-3200). - if (merging_runs_.size() < 2) { - return mem_tracker_->MemLimitExceeded( - state_, Substitute(MERGE_FAILED_ERROR_MSG, merging_runs_.size())); - } - // Merge the runs that we were able to prepare. - break; - } + RETURN_IF_ERROR(run->PrepareRead()); + // Run::GetNextBatch() is used by the merger to retrieve a batch of rows to merge // from this run. merge_runs.push_back(bind<Status>(mem_fn(&Run::GetNextBatch), run, _1)); http://git-wip-us.apache.org/repos/asf/impala/blob/ab7ac5b6/be/src/runtime/sorter.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h index 2527958..c01a54a 100644 --- a/be/src/runtime/sorter.h +++ b/be/src/runtime/sorter.h @@ -149,8 +149,9 @@ class Sorter { void Close(RuntimeState* state); /// Compute the minimum amount of buffer memory in bytes required to execute a - /// sort with the current sorter. - int64_t ComputeMinReservation(); + /// sort with the current sorter. Must be kept in sync with + /// SortNode.computeNodeResourceProfile() in fe. + int64_t ComputeMinReservation() const; /// Return true if the sorter has any spilled runs. bool HasSpilledRuns() const; @@ -162,21 +163,19 @@ class Sorter { class TupleSorter; /// Create a SortedRunMerger from sorted runs in 'sorted_runs_' and assign it to - /// 'merger_'. Attempts to set up merger with 'max_num_runs' runs but may set it - /// up with fewer if it cannot pin the initial pages of all of the runs. Fails - /// if it cannot merge at least two runs. The runs to be merged are removed from - /// 'sorted_runs_'. The Sorter sets the 'deep_copy_input' flag to true for the - /// merger, since the pages containing input run data will be deleted as input - /// runs are read. - Status CreateMerger(int max_num_runs) WARN_UNUSED_RESULT; + /// 'merger_'. 'num_runs' indicates how many runs should be covered by the current + /// merging attempt. Returns error if memory allocation fails during in + /// Run::PrepareRead(). The runs to be merged are removed from 'sorted_runs_'. The + /// Sorter sets the 'deep_copy_input' flag to true for the merger, since the pages + /// containing input run data will be deleted as input runs are read. + Status CreateMerger(int num_runs) WARN_UNUSED_RESULT; /// Repeatedly replaces multiple smaller runs in sorted_runs_ with a single larger /// merged run until there are few enough runs to be merged with a single merger. - /// Returns when 'merger_' is set up to merge the final runs. - /// At least 1 (2 if var-len slots) page from each sorted run must be pinned for - /// a merge. If the number of sorted runs is too large, merge sets of smaller runs - /// into large runs until a final merge can be performed. An intermediate row batch - /// containing deep copied rows is used for the output of each intermediate merge. + /// Returns when 'merger_' is set up to merge the final runs. If the number of sorted + /// runs is too large, merge sets of smaller runs into large runs until a final merge + /// can be performed. An intermediate row batch containing deep copied rows is used for + /// the output of each intermediate merge. Status MergeIntermediateRuns() WARN_UNUSED_RESULT; /// Execute a single step of the intermediate merge, pulling rows from 'merger_' @@ -190,6 +189,24 @@ class Sorter { /// Helper that cleans up all runs in the sorter. void CleanupAllRuns(); + /// Based on the amount of unused buffers the Sorter has through the BufferPool this + /// function calculates the maximum number of runs that can be taken care of during the + /// next merge intermediate merge. Takes into account that a separate run is needed for + /// the output. + int MaxRunsInNextMerge() const; + + /// Calculates the number of runs the 'merger_' should grab for merging in the current + /// round of merging. Returns at most MaxRunsInNextMerge(), so the Sorter will have + /// enough reservation to merge this number of runs. + int GetNumOfRunsForMerge() const; + + /// If the number of available buffers is not enough to grab all the runs to merge in + /// one round then this functions starts allocating additional free buffers one by one + /// until it reaches the maximum limit the Sorter can have or until we have enough free + /// buffers for all the runs. This is possible if other operators have released memory + /// since the Sorter has started working on it's initial runs. + void TryToIncreaseMemAllocationForMerge(); + /// ID of the ExecNode that owns the sorter, used for error reporting. const int node_id_; http://git-wip-us.apache.org/repos/asf/impala/blob/ab7ac5b6/tests/query_test/test_sort.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_sort.py b/tests/query_test/test_sort.py index a6ffd28..66f1742 100644 --- a/tests/query_test/test_sort.py +++ b/tests/query_test/test_sort.py @@ -42,23 +42,23 @@ class TestQueryFullSort(ImpalaTestSuite): cls.ImpalaTestMatrix.add_constraint(lambda v:\ v.get_value('table_format').file_format == 'parquet') - def test_multiple_mem_limits(self, vector): - """Exercise the dynamic memory scaling functionality.""" - - """Using lineitem table forces the multi-phase sort with low mem_limit. This test - takes about a minute""" + def test_multiple_buffer_pool_limits(self, vector): + """Using lineitem table forces the multi-phase sort with low buffer_pool_limit. + This test takes about a minute.""" query = """select l_comment, l_partkey, l_orderkey, l_suppkey, l_commitdate from lineitem order by l_comment limit 100000""" exec_option = copy(vector.get_value('exec_option')) exec_option['disable_outermost_topn'] = 1 + exec_option['num_nodes'] = 1 table_format = vector.get_value('table_format') - """The first run should fit in memory, the 300m run is a 2-phase disk sort, - the 150m run is a multi-phase sort (i.e. with an intermediate merge).""" - for mem_limit in ['-1', '300m', '150m']: - exec_option['mem_limit'] = mem_limit - result = transpose_results(self.execute_query( - query, exec_option, table_format=table_format).data) + """The first run should fit in memory, the second run is a 2-phase disk sort, + and the third run is a multi-phase sort (i.e. with an intermediate merge).""" + for buffer_pool_limit in ['-1', '300m', '130m']: + exec_option['buffer_pool_limit'] = buffer_pool_limit + query_result = self.execute_query( + query, exec_option, table_format=table_format) + result = transpose_results(query_result.data) assert(result[0] == sorted(result[0])) def test_multiple_mem_limits_full_output(self, vector): @@ -92,17 +92,19 @@ class TestQueryFullSort(ImpalaTestSuite): assert(result[0] == sorted(result[0])) def test_sort_join(self, vector): - """With 200m memory limit this should be a 2-phase sort""" + """With minimum memory limit this should be a 1-phase sort""" query = """select o1.o_orderdate, o2.o_custkey, o1.o_comment from orders o1 join orders o2 on (o1.o_orderkey = o2.o_orderkey) order by o1.o_orderdate limit 100000""" exec_option = copy(vector.get_value('exec_option')) exec_option['disable_outermost_topn'] = 1 - exec_option['mem_limit'] = "1200m" + exec_option['mem_limit'] = "134m" + exec_option['num_nodes'] = 1 table_format = vector.get_value('table_format') - result = transpose_results(self.execute_query( - query, exec_option, table_format=table_format).data) + query_result = self.execute_query(query, exec_option, table_format=table_format) + assert "TotalMergesPerformed: 1" in query_result.runtime_profile + result = transpose_results(query_result.data) assert(result[0] == sorted(result[0])) def test_sort_union(self, vector):