IMPALA-5706: Spilling sort optimisations

This patch covers multiple changes with the purpose of optimizing
spilling sort mechanism:
  - Remove the hard-coded maximum limit of buffers that can be used
    for merging the sorted runs. Instead this number is calculated
    based on the available memory through buffer pool.
  - The already sorted runs are distributed more optimally between
    the last intermediate merge and the final merge to avoid that a
    heavy intermediate merge is followed by a light final merge.
  - Right before starting the merging phase Sorter tries to allocate
    additional memory through the buffer pool.
  - An output run is not allocated anymore for the final merge.

Note, double-buffering the runs during a merge was also planned with
this patch. However, performance testing showed that except some
exotic queries with unreasonably small amount of buffer pool memory
available double-buffering doesn't add to the overall performance.
It's basically because the half of the available buffers have to be
sacrificed to do double-buffering and as a result the merge tree can
get deeper. In addition the amount of I/O wait time is not reaching
the level where double-buffering could countervail the reduced number
of runs during a particular merge.

Performance measurements were made during manual testing to verify
that this is in fact an optimization:
  - In case doing a sort on top of a join when working with a
    restricted amount of memory then the Sort node successfully
    allocates additional memory right before the merging phase. This
    is feasible because once Join finishes sending new input data and
    calls InputDone() then it releases memory that can be picked up
    by the Sorter. This results in shallower merging trees (more runs
    grabbed for a merge).
  - On a multi-node cluster I verified that in cases when at least one
    merging step is done then this change reduces the execution time
    for sorts.
  - The more merging steps are done the bigger the performance gain is
    compared to the baseline.

Change-Id: I74857c1694802e81f1cfc765d2b4e8bc644387f9
Reviewed-on: http://gerrit.cloudera.org:8080/9943
Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ab7ac5b6
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ab7ac5b6
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ab7ac5b6

Branch: refs/heads/2.x
Commit: ab7ac5b6108646f98a9dcfcfb3a17f5ab5861586
Parents: 873053f
Author: Gabor Kaszab <gaborkas...@cloudera.com>
Authored: Fri Apr 6 17:04:06 2018 +0200
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Wed Jun 13 03:10:18 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/sorter.cc      | 192 +++++++++++++++++++------------------
 be/src/runtime/sorter.h       |  45 ++++++---
 tests/query_test/test_sort.py |  32 ++++---
 3 files changed, 149 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ab7ac5b6/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index ead4065..a8bab16 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -44,18 +44,6 @@ namespace impala {
 // Number of pinned pages required for a merge with fixed-length data only.
 const int MIN_BUFFERS_PER_MERGE = 3;
 
-// Maximum number of buffers to use in each merge to prevent sorter trying to 
grab
-// all the memory when spilling. Given 8mb buffers, this limits the sorter to 
using
-// 1GB of buffers when merging.
-// TODO: this is an arbitrary limit. Once we have reliable reservations 
(IMPALA-3200)
-// we should base this on the number of reservations.
-const int MAX_BUFFERS_PER_MERGE = 128;
-
-const string MERGE_FAILED_ERROR_MSG = "Failed to allocate page to merge 
spilled runs "
-    "during sorting. Only $0 runs could be merged, but must be able to merge 
at least 2 "
-    "to make progress. Reducing query concurrency or increasing the memory 
limit may "
-    "help this query to complete successfully.";
-
 /// Wrapper around BufferPool::PageHandle that tracks additional info about 
the page.
 /// The Page can be in four states:
 /// * Closed: The page starts in this state before Init() is called. Calling
@@ -259,11 +247,8 @@ class Sorter::Run {
 
   /// Prepare to read a sorted run. Pins the first page(s) in the run if the 
run was
   /// previously unpinned. If the run was unpinned, try to pin the initial 
fixed and
-  /// var len pages in the run. If it couldn't pin them, set pinned to false.
-  /// In that case, none of the initial pages will be pinned and it is valid to
-  /// call PrepareRead() again to retry pinning. pinned is always set to
-  /// true if the run was pinned.
-  Status PrepareRead(bool* pinned) WARN_UNUSED_RESULT;
+  /// var len pages in the run. If it couldn't pin them, an error Status is 
returned.
+  Status PrepareRead() WARN_UNUSED_RESULT;
 
   /// Interface for merger - get the next batch of rows from this run. This 
run still
   /// owns the returned batch. Calls GetNext(RowBatch*, bool*).
@@ -301,6 +286,13 @@ class Sorter::Run {
   inline void set_sorted() { is_sorted_ = true; }
   inline int64_t num_tuples() const { return num_tuples_; }
 
+  /// Returns true if we have var-len pages in the run.
+  inline bool HasVarLenPages() const {
+    // Shouldn't have any pages unless there are slots.
+    DCHECK(var_len_pages_.empty() || has_var_len_slots_);
+    return !var_len_pages_.empty();
+  }
+
  private:
   /// TupleIterator needs access to internals to iterate over tuples.
   friend class TupleIterator;
@@ -362,13 +354,6 @@ class Sorter::Run {
   /// data is in the next page, in which case 'tuple' is unmodified.
   bool ConvertOffsetsToPtrs(Tuple* tuple);
 
-  /// Returns true if we have var-len pages in the run.
-  inline bool HasVarLenPages() const {
-    // Shouldn't have any pages unless there are slots.
-    DCHECK(var_len_pages_.empty() || has_var_len_slots_);
-    return !var_len_pages_.empty();
-  }
-
   static int NumOpenPages(const vector<Page>& pages) {
     int count = 0;
     for (const Page& page : pages) {
@@ -872,7 +857,7 @@ cleanup_pages:
   return status;
 }
 
-Status Sorter::Run::PrepareRead(bool* pinned) {
+Status Sorter::Run::PrepareRead() {
   DCHECK(is_finalized_);
   DCHECK(is_sorted_);
 
@@ -886,26 +871,18 @@ Status Sorter::Run::PrepareRead(bool* pinned) {
       sorter_->output_row_desc_, sorter_->state_->batch_size(), 
sorter_->mem_tracker_));
 
   // If the run is pinned, all pages are already pinned, so we're ready to 
read.
-  if (is_pinned_) {
-    *pinned = true;
-    return Status::OK();
-  }
-
-  int num_to_pin = (fixed_len_pages_.size() > 0 ? 1 : 0) + (HasVarLenPages() ? 
1 : 0);
-  int64_t required_mem = num_to_pin * sorter_->page_len_;
-  if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(required_mem)) {
-    *pinned = false;
-    return Status::OK();
-  }
+  if (is_pinned_) return Status::OK();
 
-  // Attempt to pin the first fixed and var-length pages.
+  // Pins the first fixed len page.
   if (fixed_len_pages_.size() > 0) {
     RETURN_IF_ERROR(fixed_len_pages_[0].Pin(sorter_->buffer_pool_client_));
   }
+
+  // Pins the first var len page if there is any.
   if (HasVarLenPages()) {
     RETURN_IF_ERROR(var_len_pages_[0].Pin(sorter_->buffer_pool_client_));
   }
-  *pinned = true;
+
   return Status::OK();
 }
 
@@ -1555,8 +1532,8 @@ Status Sorter::Open() {
   return Status::OK();
 }
 
-int64_t Sorter::ComputeMinReservation() {
-  // Must be kept in sync with SortNode.computeNodeResourceProfile() in fe.
+// Must be kept in sync with SortNode.computeNodeResourceProfile() in fe.
+int64_t Sorter::ComputeMinReservation() const {
   int min_buffers_required = enable_spilling_ ? MIN_BUFFERS_PER_MERGE : 1;
   // Fixed and var-length pages are separate, so we need double the pages
   // if there is var-length data.
@@ -1605,9 +1582,7 @@ Status Sorter::InputDone() {
     // The entire input fit in one run. Read sorted rows in GetNext() directly 
from the
     // in-memory sorted run.
     DCHECK(sorted_runs_.back()->is_pinned());
-    bool success;
-    RETURN_IF_ERROR(sorted_runs_.back()->PrepareRead(&success));
-    DCHECK(success) << "Should always be able to prepare pinned run for read.";
+    RETURN_IF_ERROR(sorted_runs_.back()->PrepareRead());
     return Status::OK();
   }
   DCHECK(enable_spilling_);
@@ -1618,9 +1593,6 @@ Status Sorter::InputDone() {
   RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
 
   // Merge intermediate runs until we have a final merge set-up.
-  // TODO: Attempt to allocate more memory before doing intermediate merges. 
This may
-  // be possible if other operators have relinquished memory after the sort 
has built
-  // its runs. This depends on more reliable reservations (IMPALA-3200)
   return MergeIntermediateRuns();
 }
 
@@ -1679,43 +1651,93 @@ Status Sorter::SortCurrentInputRun() {
   return Status::OK();
 }
 
+int Sorter::MaxRunsInNextMerge() const {
+  int num_available_buffers = buffer_pool_client_->GetUnusedReservation() / 
page_len_;
+  DCHECK_GE(num_available_buffers, ComputeMinReservation() / page_len_);
+  int num_runs_in_one_merge = 0;
+  int num_required_buffers = 0;
+
+  for (int i = 0; i < sorted_runs_.size(); ++i) {
+    int num_buffers_for_this_run = (sorted_runs_[i]->HasVarLenPages()) ? 2 : 1;
+
+    if (num_required_buffers + num_buffers_for_this_run <= 
num_available_buffers) {
+      num_required_buffers += num_buffers_for_this_run;
+      ++num_runs_in_one_merge;
+    } else {
+      // Not enough buffers to merge all the runs in one final merge. 
Intermediate merge
+      // is required.
+      // Increasing the required buffers count to include the result run of the
+      // intermediate merge.
+      num_required_buffers +=
+          (output_row_desc_->tuple_descriptors()[0]->HasVarlenSlots()) ? 2 : 1;
+      // Have to reduce the number of runs for this merge to fit in the 
available buffer
+      // pool memory.
+      for (int j = i - 1; j >= 0; --j) {
+        num_required_buffers -= sorted_runs_[j]->HasVarLenPages() ? 2 : 1;
+        --num_runs_in_one_merge;
+        if (num_required_buffers <= num_available_buffers) break;
+      }
+      DCHECK_LE(num_required_buffers, num_available_buffers);
+      break;
+    }
+  }
+
+  DCHECK_GT(num_runs_in_one_merge, 1);
+  return num_runs_in_one_merge;
+}
+
+void Sorter::TryToIncreaseMemAllocationForMerge() {
+  int pages_needed_for_full_merge = 0;
+  for (auto run : sorted_runs_) {
+    pages_needed_for_full_merge += (run->HasVarLenPages()) ? 2 : 1;
+  }
+  int available_pages = buffer_pool_client_->GetUnusedReservation() / 
page_len_;
+
+  // Start allocating more pages than available now. Stop once no more memory 
can be
+  // allocated.
+  for (int i = 0; i < pages_needed_for_full_merge - available_pages; ++i) {
+    if (!buffer_pool_client_->IncreaseReservation(page_len_)) return;
+  }
+}
+
+int Sorter::GetNumOfRunsForMerge() const {
+  int max_runs_in_next_merge = MaxRunsInNextMerge();
+
+  // Check if all the runs fit in a final merge. Won't need an extra run for 
the output
+  // compared to an intermediate run.
+  if (max_runs_in_next_merge == sorted_runs_.size()) {
+    return max_runs_in_next_merge;
+  }
+
+  // If this is the last intermediate merge before the final merge then 
distributes the
+  // runs between the intermediate and the final merge to saturate the final 
merge with
+  // as many runs as possible reducing the number of merging on the same rows.
+  if (max_runs_in_next_merge * 2 >= sorted_runs_.size()) {
+    return sorted_runs_.size() - max_runs_in_next_merge;
+  }
+  return max_runs_in_next_merge;
+}
+
 Status Sorter::MergeIntermediateRuns() {
   DCHECK_GE(sorted_runs_.size(), 2);
-  int pinned_pages_per_run = has_var_len_slots_ ? 2 : 1;
-  int max_runs_per_final_merge = MAX_BUFFERS_PER_MERGE / pinned_pages_per_run;
 
-  // During an intermediate merge, the one or two pages from the output sorted 
run
-  // that are being written must be pinned.
-  int max_runs_per_intermediate_merge = max_runs_per_final_merge - 1;
-  DCHECK_GT(max_runs_per_intermediate_merge, 1);
+  // Attempt to allocate more memory before doing intermediate merges. This may
+  // be possible if other operators have relinquished memory after the sort 
has built
+  // its runs.
+  TryToIncreaseMemAllocationForMerge();
 
   while (true) {
-    // An intermediate merge adds one merge to unmerged_sorted_runs_.
-    // TODO: once we have reliable reservations (IMPALA-3200), we should 
calculate this
-    // based on the available reservations.
-    int num_runs_to_merge =
-        min<int>(max_runs_per_intermediate_merge, sorted_runs_.size());
+    int num_of_runs_to_merge = GetNumOfRunsForMerge();
 
     DCHECK(merge_output_run_ == NULL) << "Should have finished previous 
merge.";
-    // Create the merged run in case we need to do intermediate merges. We 
need the
-    // output run and at least two input runs in memory to make progress on the
-    // intermediate merges.
-    // TODO: this isn't optimal: we could defer creating the merged run if we 
have
-    // reliable reservations (IMPALA-3200).
+    RETURN_IF_ERROR(CreateMerger(num_of_runs_to_merge));
+
+    // If CreateMerger() consumed all the sorted runs, we have set up the 
final merge.
+    if (sorted_runs_.empty()) return Status::OK();
+
     merge_output_run_ = run_pool_.Add(
         new Run(this, output_row_desc_->tuple_descriptors()[0], false));
     RETURN_IF_ERROR(merge_output_run_->Init());
-    RETURN_IF_ERROR(CreateMerger(num_runs_to_merge));
-
-    // If CreateMerger() consumed all the sorted runs, we have set up the 
final merge.
-    if (sorted_runs_.empty()) {
-      // Don't need intermediate run for final merge.
-      if (merge_output_run_ != NULL) {
-        merge_output_run_->CloseAllPages();
-        merge_output_run_ = NULL;
-      }
-      return Status::OK();
-    }
     RETURN_IF_ERROR(ExecuteIntermediateMerge(merge_output_run_));
     sorted_runs_.push_back(merge_output_run_);
     merge_output_run_ = NULL;
@@ -1723,8 +1745,8 @@ Status Sorter::MergeIntermediateRuns() {
   return Status::OK();
 }
 
-Status Sorter::CreateMerger(int max_num_runs) {
-  DCHECK_GE(max_num_runs, 2);
+Status Sorter::CreateMerger(int num_runs) {
+  DCHECK_GE(num_runs, 2);
   DCHECK_GE(sorted_runs_.size(), 2);
   // Clean up the runs from the previous merge.
   Run::CleanupRuns(&merging_runs_);
@@ -1736,23 +1758,11 @@ Status Sorter::CreateMerger(int max_num_runs) {
       new SortedRunMerger(compare_less_than_, output_row_desc_, profile_, 
true));
 
   vector<function<Status (RowBatch**)>> merge_runs;
-  merge_runs.reserve(max_num_runs);
-  for (int i = 0; i < max_num_runs; ++i) {
+  merge_runs.reserve(num_runs);
+  for (int i = 0; i < num_runs; ++i) {
     Run* run = sorted_runs_.front();
-    bool success;
-    RETURN_IF_ERROR(run->PrepareRead(&success));
-    if (!success) {
-      // If we can merge at least two runs, we can continue, otherwise we have 
a problem
-      // because we can't make progress on the merging.
-      // TODO: IMPALA-3200: we should not need this logic once we have reliable
-      // reservations (IMPALA-3200).
-      if (merging_runs_.size() < 2) {
-        return mem_tracker_->MemLimitExceeded(
-            state_, Substitute(MERGE_FAILED_ERROR_MSG, merging_runs_.size()));
-      }
-      // Merge the runs that we were able to prepare.
-      break;
-    }
+    RETURN_IF_ERROR(run->PrepareRead());
+
     // Run::GetNextBatch() is used by the merger to retrieve a batch of rows 
to merge
     // from this run.
     merge_runs.push_back(bind<Status>(mem_fn(&Run::GetNextBatch), run, _1));

http://git-wip-us.apache.org/repos/asf/impala/blob/ab7ac5b6/be/src/runtime/sorter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h
index 2527958..c01a54a 100644
--- a/be/src/runtime/sorter.h
+++ b/be/src/runtime/sorter.h
@@ -149,8 +149,9 @@ class Sorter {
   void Close(RuntimeState* state);
 
   /// Compute the minimum amount of buffer memory in bytes required to execute 
a
-  /// sort with the current sorter.
-  int64_t ComputeMinReservation();
+  /// sort with the current sorter. Must be kept in sync with
+  /// SortNode.computeNodeResourceProfile() in fe.
+  int64_t ComputeMinReservation() const;
 
   /// Return true if the sorter has any spilled runs.
   bool HasSpilledRuns() const;
@@ -162,21 +163,19 @@ class Sorter {
   class TupleSorter;
 
   /// Create a SortedRunMerger from sorted runs in 'sorted_runs_' and assign 
it to
-  /// 'merger_'. Attempts to set up merger with 'max_num_runs' runs but may 
set it
-  /// up with fewer if it cannot pin the initial pages of all of the runs. 
Fails
-  /// if it cannot merge at least two runs. The runs to be merged are removed 
from
-  /// 'sorted_runs_'.  The Sorter sets the 'deep_copy_input' flag to true for 
the
-  /// merger, since the pages containing input run data will be deleted as 
input
-  /// runs are read.
-  Status CreateMerger(int max_num_runs) WARN_UNUSED_RESULT;
+  /// 'merger_'. 'num_runs' indicates how many runs should be covered by the 
current
+  /// merging attempt. Returns error if memory allocation fails during in
+  /// Run::PrepareRead(). The runs to be merged are removed from 
'sorted_runs_'. The
+  /// Sorter sets the 'deep_copy_input' flag to true for the merger, since the 
pages
+  /// containing input run data will be deleted as input runs are read.
+  Status CreateMerger(int num_runs) WARN_UNUSED_RESULT;
 
   /// Repeatedly replaces multiple smaller runs in sorted_runs_ with a single 
larger
   /// merged run until there are few enough runs to be merged with a single 
merger.
-  /// Returns when 'merger_' is set up to merge the final runs.
-  /// At least 1 (2 if var-len slots) page from each sorted run must be pinned 
for
-  /// a merge. If the number of sorted runs is too large, merge sets of 
smaller runs
-  /// into large runs until a final merge can be performed. An intermediate 
row batch
-  /// containing deep copied rows is used for the output of each intermediate 
merge.
+  /// Returns when 'merger_' is set up to merge the final runs. If the number 
of sorted
+  /// runs is too large, merge sets of smaller runs into large runs until a 
final merge
+  /// can be performed. An intermediate row batch containing deep copied rows 
is used for
+  /// the output of each intermediate merge.
   Status MergeIntermediateRuns() WARN_UNUSED_RESULT;
 
   /// Execute a single step of the intermediate merge, pulling rows from 
'merger_'
@@ -190,6 +189,24 @@ class Sorter {
   /// Helper that cleans up all runs in the sorter.
   void CleanupAllRuns();
 
+  /// Based on the amount of unused buffers the Sorter has through the 
BufferPool this
+  /// function calculates the maximum number of runs that can be taken care of 
during the
+  /// next merge intermediate merge. Takes into account that a separate run is 
needed for
+  /// the output.
+  int MaxRunsInNextMerge() const;
+
+  /// Calculates the number of runs the 'merger_' should grab for merging in 
the current
+  /// round of merging. Returns at most MaxRunsInNextMerge(), so the Sorter 
will have
+  /// enough reservation to merge this number of runs.
+  int GetNumOfRunsForMerge() const;
+
+  /// If the number of available buffers is not enough to grab all the runs to 
merge in
+  /// one round then this functions starts allocating additional free buffers 
one by one
+  /// until it reaches the maximum limit the Sorter can have or until we have 
enough free
+  /// buffers for all the runs. This is possible if other operators have 
released memory
+  /// since the Sorter has started working on it's initial runs.
+  void TryToIncreaseMemAllocationForMerge();
+
   /// ID of the ExecNode that owns the sorter, used for error reporting.
   const int node_id_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/ab7ac5b6/tests/query_test/test_sort.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_sort.py b/tests/query_test/test_sort.py
index a6ffd28..66f1742 100644
--- a/tests/query_test/test_sort.py
+++ b/tests/query_test/test_sort.py
@@ -42,23 +42,23 @@ class TestQueryFullSort(ImpalaTestSuite):
       cls.ImpalaTestMatrix.add_constraint(lambda v:\
           v.get_value('table_format').file_format == 'parquet')
 
-  def test_multiple_mem_limits(self, vector):
-    """Exercise the dynamic memory scaling functionality."""
-
-    """Using lineitem table forces the multi-phase sort with low mem_limit. 
This test
-       takes about a minute"""
+  def test_multiple_buffer_pool_limits(self, vector):
+    """Using lineitem table forces the multi-phase sort with low 
buffer_pool_limit.
+       This test takes about a minute."""
     query = """select l_comment, l_partkey, l_orderkey, l_suppkey, l_commitdate
             from lineitem order by l_comment limit 100000"""
     exec_option = copy(vector.get_value('exec_option'))
     exec_option['disable_outermost_topn'] = 1
+    exec_option['num_nodes'] = 1
     table_format = vector.get_value('table_format')
 
-    """The first run should fit in memory, the 300m run is a 2-phase disk sort,
-       the 150m run is a multi-phase sort (i.e. with an intermediate merge)."""
-    for mem_limit in ['-1', '300m', '150m']:
-      exec_option['mem_limit'] = mem_limit
-      result = transpose_results(self.execute_query(
-        query, exec_option, table_format=table_format).data)
+    """The first run should fit in memory, the second run is a 2-phase disk 
sort,
+       and the third run is a multi-phase sort (i.e. with an intermediate 
merge)."""
+    for buffer_pool_limit in ['-1', '300m', '130m']:
+      exec_option['buffer_pool_limit'] = buffer_pool_limit
+      query_result = self.execute_query(
+        query, exec_option, table_format=table_format)
+      result = transpose_results(query_result.data)
       assert(result[0] == sorted(result[0]))
 
   def test_multiple_mem_limits_full_output(self, vector):
@@ -92,17 +92,19 @@ class TestQueryFullSort(ImpalaTestSuite):
       assert(result[0] == sorted(result[0]))
 
   def test_sort_join(self, vector):
-    """With 200m memory limit this should be a 2-phase sort"""
+    """With minimum memory limit this should be a 1-phase sort"""
     query = """select o1.o_orderdate, o2.o_custkey, o1.o_comment from orders 
o1 join
     orders o2 on (o1.o_orderkey = o2.o_orderkey) order by o1.o_orderdate limit 
100000"""
 
     exec_option = copy(vector.get_value('exec_option'))
     exec_option['disable_outermost_topn'] = 1
-    exec_option['mem_limit'] = "1200m"
+    exec_option['mem_limit'] = "134m"
+    exec_option['num_nodes'] = 1
     table_format = vector.get_value('table_format')
 
-    result = transpose_results(self.execute_query(
-      query, exec_option, table_format=table_format).data)
+    query_result = self.execute_query(query, exec_option, 
table_format=table_format)
+    assert "TotalMergesPerformed: 1" in query_result.runtime_profile
+    result = transpose_results(query_result.data)
     assert(result[0] == sorted(result[0]))
 
   def test_sort_union(self, vector):

Reply via email to