This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 80c1d2dba IMPALA-4530: Implement in-memory merge of quicksorted runs
80c1d2dba is described below

commit 80c1d2dbaabc78bdf1b6e4da5475bfa365cd375e
Author: noemi <[email protected]>
AuthorDate: Wed Apr 6 16:36:27 2022 +0200

    IMPALA-4530: Implement in-memory merge of quicksorted runs
    
    This change aims to decrease back-pressure in the sorter. It offers an
    alternative for the in-memory run formation strategy and sorting
    algorithm by introducing a new in-memory merge level between the
    in-memory quicksort and the external merge phase.
    Instead of forming one big run, it produces many smaller in-memory runs
    (called miniruns), sorts those with quicksort, then merges them
    in memory, before spilling or serving GetNext().
    The external merge phase remains the same.
    Works with MAX_SORT_RUN_SIZE development query option that determines
    the maximum number of pages in a 'minirun'. The default value of
    MAX_SORT_RUN_SIZE is 0, which keeps the original implementation of 1
    big initial in-memory run. Other options are integers of 2 and above.
    The recommended value is 10 or more, to avoid high fragmentation
    in case of large workloads and variable length data.
    
    Testing:
    - added MAX_SORT_RUN_SIZE as an additional test dimension to
      test_sort.py with values [0, 2, 20]
    - additional partial sort test case (inserting into partitioned
      kudu table)
    - manual E2E testing
    
    Change-Id: I58c0ae112e279b93426752895ded7b1a3791865c
    Reviewed-on: http://gerrit.cloudera.org:8080/18393
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Csaba Ringhofer <[email protected]>
    Tested-by: Csaba Ringhofer <[email protected]>
---
 be/src/exec/partial-sort-node.cc   |   8 +-
 be/src/exec/partial-sort-node.h    |   3 +
 be/src/runtime/sorter-internal.h   |  40 +++++-
 be/src/runtime/sorter.cc           | 272 +++++++++++++++++++++++++++++++++----
 be/src/runtime/sorter.h            |  56 ++++++--
 be/src/service/query-options.cc    |   9 ++
 be/src/service/query-options.h     |   5 +-
 be/src/util/tuple-row-compare.h    |  22 ++-
 bin/perf_tools/perf-query.sh       |   2 +-
 common/thrift/ImpalaService.thrift |  10 ++
 common/thrift/Query.thrift         |   5 +
 tests/query_test/test_sort.py      |  82 +++++++++--
 12 files changed, 452 insertions(+), 62 deletions(-)

diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
index 1fadbe523..5919bb46a 100644
--- a/be/src/exec/partial-sort-node.cc
+++ b/be/src/exec/partial-sort-node.cc
@@ -66,6 +66,7 @@ PartialSortNode::PartialSortNode(
     input_eos_(false),
     sorter_eos_(true) {
   runtime_profile()->AddInfoString("SortType", "Partial");
+  child_get_next_timer_ = ADD_SUMMARY_STATS_TIMER(runtime_profile(), 
"ChildGetNextTime");
 }
 
 PartialSortNode::~PartialSortNode() {
@@ -143,7 +144,12 @@ Status PartialSortNode::GetNext(RuntimeState* state, 
RowBatch* row_batch, bool*
     if (input_batch_index_ == input_batch_->num_rows()) {
       input_batch_->Reset();
       input_batch_index_ = 0;
-      RETURN_IF_ERROR(child(0)->GetNext(state, input_batch_.get(), 
&input_eos_));
+      MonotonicStopWatch timer;
+      timer.Start();
+      Status status = child(0)->GetNext(state, input_batch_.get(), 
&input_eos_);
+      timer.Stop();
+      RETURN_IF_ERROR(status);
+      child_get_next_timer_->UpdateCounter(timer.ElapsedTime());
     }
 
     int num_processed;
diff --git a/be/src/exec/partial-sort-node.h b/be/src/exec/partial-sort-node.h
index a02bb2fca..0f1ebe456 100644
--- a/be/src/exec/partial-sort-node.h
+++ b/be/src/exec/partial-sort-node.h
@@ -86,6 +86,9 @@ class PartialSortNode : public ExecNode {
 
   const TupleRowComparatorConfig& tuple_row_comparator_config_;
 
+  /// Min, max, and avg time spent in calling GetNext on child
+  RuntimeProfile::SummaryStatsCounter* child_get_next_timer_;
+
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 
diff --git a/be/src/runtime/sorter-internal.h b/be/src/runtime/sorter-internal.h
index 7661051ba..a974af456 100644
--- a/be/src/runtime/sorter-internal.h
+++ b/be/src/runtime/sorter-internal.h
@@ -114,13 +114,22 @@ class Sorter::Page {
 ///
 /// Runs are either "initial runs" constructed from the sorter's input by 
evaluating
 /// the expressions in 'sort_tuple_exprs_' or "intermediate runs" constructed
-/// by merging already-sorted runs. Initial runs are sorted in-place in 
memory. Once
-/// sorted, runs can be spilled to disk to free up memory. Sorted runs are 
merged by
+/// by merging already-sorted runs. Initial runs are sorted in-place in memory.
+/// Once sorted, runs can be spilled to disk to free up memory. Sorted runs 
are merged by
 /// SortedRunMerger, either to produce the final sorted output or to produce 
another
 /// sorted run.
+/// By default, the size of initial runs is determined by the available 
memory: the
+/// sorter tries to add batches to the run until some (memory) limit is 
reached.
+/// Some query options can also limit the size of an initial (or in-memory) 
run.
+/// SORT_RUN_BYTES_LIMIT triggers spilling after the size of data in the run 
exceeds the
+/// given threshold (usually expressed in MB or GB).
+/// MAX_SORT_RUN_SIZE allows constructing runs up to a certain size by 
limiting the
+/// number of pages in the initial runs. These smaller in-memory runs are also 
referred
+/// to as 'miniruns'. Miniruns are not spilled immediately, but sorted 
in-place first,
+/// and collected to be merged in memory before spilling the produced output 
run to disk.
 ///
 /// The expected calling sequence of functions is as follows:
-/// * Init() to initialize the run and allocate initial pages.
+/// * Init() or TryInit() to initialize the run and allocate initial pages.
 /// * Add*Batch() to add batches of tuples to the run.
 /// * FinalizeInput() to signal that no more batches will be added.
 /// * If the run is unsorted, it must be sorted. After that set_sorted() must 
be called.
@@ -141,13 +150,23 @@ class Sorter::Run {
   /// var-len data into var_len_copy_page_.
   Status Init();
 
+  /// Similar to Init(), except for the following differences:
+  /// It is only used to initialize miniruns (query option MAX_SORT_RUN_SIZE > 
0 cases).
+  /// The first in-memory run is always initialized by calling Init(), because 
that must
+  /// succeed. The following ones are initialized by TryInit().
+  /// TryInit() allocates one fixed-len page and one var-len page if 
'has_var_len_slots_'
+  /// is true. There is no need for var_len_copy_page here. Returns false if
+  /// initialization was successful, returns true, if reservation was not 
enough.
+  Status TryInit(bool* allocation_failed);
+
   /// Add the rows from 'batch' starting at 'start_index' to the current run. 
Returns
   /// the number of rows actually added in 'num_processed'. If the run is full 
(no more
   /// pages can be allocated), 'num_processed' may be less than the number of 
remaining
   /// rows in the batch. AddInputBatch() materializes the input rows using the
   /// expressions in sorter_->sort_tuple_expr_evals_, while 
AddIntermediateBatch() just
   /// copies rows.
-  Status AddInputBatch(RowBatch* batch, int start_index, int* num_processed);
+  Status AddInputBatch(
+      RowBatch* batch, int start_index, int* num_processed, bool* 
allocation_failed);
 
   Status AddIntermediateBatch(RowBatch* batch, int start_index, int* 
num_processed);
 
@@ -199,6 +218,9 @@ class Sorter::Run {
   bool is_finalized() const { return is_finalized_; }
   bool is_sorted() const { return is_sorted_; }
   void set_sorted() { is_sorted_ = true; }
+  int max_num_of_pages() const { return max_num_of_pages_; }
+  int fixed_len_size() { return fixed_len_pages_.size(); }
+  int run_size() { return fixed_len_pages_.size() + var_len_pages_.size(); }
   int64_t num_tuples() const { return num_tuples_; }
   /// Returns true if we have var-len pages in the run.
   bool HasVarLenPages() const {
@@ -215,8 +237,8 @@ class Sorter::Run {
   /// INITIAL_RUN and HAS_VAR_LEN_SLOTS are template arguments for performance 
and must
   /// match 'initial_run_' and 'has_var_len_slots_'.
   template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN>
-  Status AddBatchInternal(
-      RowBatch* batch, int start_index, int* num_processed);
+  Status AddBatchInternal(RowBatch* batch, int start_index, int* num_processed,
+        bool* allocation_failed);
 
   /// Finalize the list of pages: delete empty final pages and unpin the 
previous page
   /// if the run is unpinned.
@@ -352,6 +374,12 @@ class Sorter::Run {
   /// Used to implement GetNextBatch() interface required for the merger.
   boost::scoped_ptr<RowBatch> buffered_batch_;
 
+  /// Max number of fixed-len + var-len pages in an in-memory minirun. It 
defines the
+  /// length of a minirun.
+  /// The default value is 0 which means that only 1 in-memory run will be 
created, and
+  /// its size will be determined by other limits eg. memory or 
sort_run_bytes_limit.
+  int max_num_of_pages_;
+
   /// Members used when a run is read in GetNext().
   /// The index into 'fixed_' and 'var_len_pages_' of the pages being read in 
GetNext().
   int fixed_len_pages_index_;
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index ae885252b..93a4cb908 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -121,11 +121,12 @@ Sorter::Run::Run(Sorter* parent, TupleDescriptor* 
sort_tuple_desc, bool initial_
     is_pinned_(initial_run),
     is_finalized_(false),
     is_sorted_(!initial_run),
-    num_tuples_(0) {}
+    num_tuples_(0),
+    max_num_of_pages_(initial_run ? parent->inmem_run_max_pages_ : 0) {}
 
 Status Sorter::Run::Init() {
-  int num_to_create = 1 + has_var_len_slots_
-      + (has_var_len_slots_ && initial_run_ && sorter_->enable_spilling_);
+  int num_to_create = 1 + has_var_len_slots_ + (has_var_len_slots_ && 
initial_run_ &&
+      (sorter_->enable_spilling_ && max_num_of_pages_ == 0));
   int64_t required_mem = num_to_create * sorter_->page_len_;
   if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(required_mem)) {
     return Status(Substitute(
@@ -152,9 +153,32 @@ Status Sorter::Run::Init() {
   return Status::OK();
 }
 
+Status Sorter::Run::TryInit(bool* allocation_failed) {
+  *allocation_failed = true;
+  DCHECK_GT(sorter_->inmem_run_max_pages_, 0);
+  // No need for additional copy page because var-len data is not reordered.
+  // The in-memory merger can copy var-len data directly from the in-memory 
runs,
+  // which are kept until the merge is finished
+  int num_to_create = 1 + has_var_len_slots_;
+  int64_t required_mem = num_to_create * sorter_->page_len_;
+  if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(required_mem)) {
+    return Status::OK();
+  }
+
+  RETURN_IF_ERROR(AddPage(&fixed_len_pages_));
+  if (has_var_len_slots_) {
+    RETURN_IF_ERROR(AddPage(&var_len_pages_));
+  }
+  if (initial_run_) {
+    sorter_->initial_runs_counter_->Add(1);
+  }
+  *allocation_failed = false;
+  return Status::OK();
+}
+
 template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN>
 Status Sorter::Run::AddBatchInternal(
-    RowBatch* batch, int start_index, int* num_processed) {
+    RowBatch* batch, int start_index, int* num_processed, bool* 
allocation_failed) {
   DCHECK(!is_finalized_);
   DCHECK(!fixed_len_pages_.empty());
   DCHECK_EQ(HAS_VAR_LEN_SLOTS, has_var_len_slots_);
@@ -226,6 +250,7 @@ Status Sorter::Run::AddBatchInternal(
             // There was not enough space in the last var-len page for this 
tuple, and
             // the run could not be extended. Return the fixed-len allocation 
and exit.
             cur_fixed_len_page->FreeBytes(sort_tuple_size_);
+            *allocation_failed = true;
             return Status::OK();
           }
         }
@@ -246,13 +271,21 @@ Status Sorter::Run::AddBatchInternal(
 
     // If there are still rows left to process, get a new page for the 
fixed-length
     // tuples. If the run is already too long, return.
+    if (INITIAL_RUN && max_num_of_pages_ > 0 && run_size() >= 
max_num_of_pages_){
+      *allocation_failed = false;
+      return Status::OK();
+    }
     if (cur_input_index < batch->num_rows()) {
       bool added;
       RETURN_IF_ERROR(TryAddPage(add_mode, &fixed_len_pages_, &added));
-      if (!added) return Status::OK();
+      if (!added) {
+        *allocation_failed = true;
+        return Status::OK();
+      }
       cur_fixed_len_page = &fixed_len_pages_.back();
     }
   }
+  *allocation_failed = false;
   return Status::OK();
 }
 
@@ -773,22 +806,28 @@ int64_t Sorter::Run::TotalBytes() const {
   return total_bytes;
 }
 
-Status Sorter::Run::AddInputBatch(RowBatch* batch, int start_index, int* 
num_processed) {
+Status Sorter::Run::AddInputBatch(RowBatch* batch, int start_index, int* 
num_processed,
+    bool* allocation_failed) {
   DCHECK(initial_run_);
   if (has_var_len_slots_) {
-    return AddBatchInternal<true, true>(batch, start_index, num_processed);
+    return AddBatchInternal<true, true>(
+        batch, start_index, num_processed, allocation_failed);
   } else {
-    return AddBatchInternal<false, true>(batch, start_index, num_processed);
+    return AddBatchInternal<false, true>(
+        batch, start_index, num_processed, allocation_failed);
   }
 }
 
 Status Sorter::Run::AddIntermediateBatch(
     RowBatch* batch, int start_index, int* num_processed) {
   DCHECK(!initial_run_);
+  bool allocation_failed = false;
   if (has_var_len_slots_) {
-    return AddBatchInternal<true, false>(batch, start_index, num_processed);
+    return AddBatchInternal<true, false>(
+        batch, start_index, num_processed, &allocation_failed);
   } else {
-    return AddBatchInternal<false, false>(batch, start_index, num_processed);
+    return AddBatchInternal<false, false>(
+        batch, start_index, num_processed, &allocation_failed);
   }
 }
 
@@ -909,8 +948,10 @@ Sorter::Sorter(const TupleRowComparatorConfig& 
tuple_row_comparator_config,
     initial_runs_counter_(nullptr),
     num_merges_counter_(nullptr),
     in_mem_sort_timer_(nullptr),
+    in_mem_merge_timer_(nullptr),
     sorted_data_size_(nullptr),
-    run_sizes_(nullptr) {
+    run_sizes_(nullptr),
+    inmem_run_max_pages_(state->query_options().max_sort_run_size) {
   switch (tuple_row_comparator_config.sorting_order_) {
     case TSortingOrder::LEXICAL:
       compare_less_than_.reset(
@@ -922,13 +963,13 @@ Sorter::Sorter(const TupleRowComparatorConfig& 
tuple_row_comparator_config,
     default:
       DCHECK(false);
   }
-
   if (estimated_input_size > 0) ComputeSpillEstimate(estimated_input_size);
 }
 
 Sorter::~Sorter() {
   DCHECK(sorted_runs_.empty());
   DCHECK(merging_runs_.empty());
+  DCHECK(sorted_inmem_runs_.empty());
   DCHECK(unsorted_run_ == nullptr);
   DCHECK(merge_output_run_ == nullptr);
 }
@@ -977,6 +1018,7 @@ Status Sorter::Prepare(ObjectPool* obj_pool) {
     initial_runs_counter_ = ADD_COUNTER(profile_, "RunsCreated", TUnit::UNIT);
   }
   in_mem_sort_timer_ = ADD_TIMER(profile_, "InMemorySortTime");
+  in_mem_merge_timer_ = ADD_TIMER(profile_, "InMemoryMergeTime");
   sorted_data_size_ = ADD_COUNTER(profile_, "SortDataSize", TUnit::BYTES);
   run_sizes_ = ADD_SUMMARY_STATS_COUNTER(profile_, "NumRowsPerRun", 
TUnit::UNIT);
 
@@ -1016,12 +1058,23 @@ Status Sorter::AddBatch(RowBatch* batch) {
     RETURN_IF_ERROR(AddBatchNoSpill(batch, cur_batch_index, &num_processed));
 
     cur_batch_index += num_processed;
+
     if (MustSortAndSpill(cur_batch_index, batch->num_rows())) {
-      // The current run is full. Sort it, spill it and begin the next one.
-      int64_t unsorted_run_bytes = unsorted_run_->TotalBytes();
       RETURN_IF_ERROR(state_->StartSpilling(mem_tracker_));
-      RETURN_IF_ERROR(SortCurrentInputRun());
-      RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
+      int64_t unsorted_run_bytes = unsorted_run_->TotalBytes();
+
+      if (inmem_run_max_pages_ == 0) {
+        // The current run is full. Sort it and spill it.
+        RETURN_IF_ERROR(SortCurrentInputRun());
+        sorted_runs_.push_back(unsorted_run_);
+        unsorted_run_ = nullptr;
+        RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
+      } else {
+        // The memory is full with miniruns. Sort, merge and spill them.
+        RETURN_IF_ERROR(MergeAndSpill());
+      }
+
+      // After we freed memory by spilling, initialize the next run.
       unsorted_run_ =
           run_pool_.Add(new Run(this, 
output_row_desc_->tuple_descriptors()[0], true));
       RETURN_IF_ERROR(unsorted_run_->Init());
@@ -1058,6 +1111,70 @@ int64_t Sorter::GetSortRunBytesLimit() const {
   }
 }
 
+Status Sorter::InitializeNewMinirun(bool* allocation_failed) {
+  // The minirun reached its size limit (max_num_of_pages).
+  // We should sort the run, append to the sorted miniruns, and start a new 
minirun.
+  DCHECK(!*allocation_failed && unsorted_run_->run_size() == 
inmem_run_max_pages_);
+
+  // When the first minirun is full, and there are more tuples to come, we 
first
+  // need to ensure that these in-memory miniruns can be merged later, by 
trying
+  // to reserve pages for the output run of the in-memory merger. Only if this
+  // initialization was successful, can we move on to create the 2nd inmem run.
+  // If it fails and/or only 1 inmem_run could fit into memory, start spilling.
+  // No need to initialize 'merge_output_run_' if spilling is disabled, 
because the
+  // output will be read directly from the merger in GetNext().
+  if (enable_spilling_ && sorted_inmem_runs_.empty()) {
+    DCHECK(merge_output_run_ == nullptr) << "Should have finished previous 
merge.";
+    merge_output_run_ = run_pool_.Add(
+        new Run(this, output_row_desc_->tuple_descriptors()[0], false));
+    RETURN_IF_ERROR(merge_output_run_->TryInit(allocation_failed));
+    if (*allocation_failed) {
+      return Status::OK();
+    }
+  }
+  RETURN_IF_ERROR(SortCurrentInputRun());
+  sorted_inmem_runs_.push_back(unsorted_run_);
+  unsorted_run_ =
+      run_pool_.Add(new Run(this, output_row_desc_->tuple_descriptors()[0], 
true));
+  RETURN_IF_ERROR(unsorted_run_->TryInit(allocation_failed));
+  if (*allocation_failed) {
+    unsorted_run_->CloseAllPages();
+  }
+  return Status::OK();
+}
+
+Status Sorter::MergeAndSpill() {
+  // The last minirun might have been created just before we ran out of memory.
+  // In this case it should not be sorted and merged.
+  if (unsorted_run_->run_size() == 0){
+    unsorted_run_ = nullptr;
+  } else {
+    RETURN_IF_ERROR(SortCurrentInputRun());
+    sorted_inmem_runs_.push_back(unsorted_run_);
+  }
+
+  // If only 1 run was created, do not merge.
+  if (sorted_inmem_runs_.size() == 1) {
+    sorted_runs_.push_back(sorted_inmem_runs_.back());
+    sorted_inmem_runs_.clear();
+    DCHECK_GT(sorted_runs_.back()->fixed_len_size(), 0);
+    RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
+    // If 'merge_output_run_' was initialized but no merge was executed,
+    // set it back to nullptr.
+    if (merge_output_run_ != nullptr){
+      merge_output_run_->CloseAllPages();
+      merge_output_run_ = nullptr;
+    }
+  } else {
+    DCHECK(merge_output_run_ != nullptr) << "Should have reserved memory for 
the merger.";
+    RETURN_IF_ERROR(MergeInMemoryRuns());
+    DCHECK(merge_output_run_ == nullptr) << "Should have finished previous 
merge.";
+  }
+
+  DCHECK(sorted_inmem_runs_.empty());
+  return Status::OK();
+}
+
 bool Sorter::MustSortAndSpill(const int rows_added, const int batch_num_rows) {
   if (rows_added < batch_num_rows) {
     return true;
@@ -1086,7 +1203,28 @@ void Sorter::TryLowerMemUpToSortRunBytesLimit() {
 
 Status Sorter::AddBatchNoSpill(RowBatch* batch, int start_index, int* 
num_processed) {
   DCHECK(batch != nullptr);
-  RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, start_index, 
num_processed));
+  bool allocation_failed = false;
+
+  RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, start_index, 
num_processed,
+      &allocation_failed));
+
+  if (inmem_run_max_pages_ > 0) {
+    start_index += *num_processed;
+    // We try to add the entire input batch. If it does not fit into 1 minirun,
+    // initialize a new one.
+    while (!allocation_failed && start_index < batch->num_rows()) {
+      RETURN_IF_ERROR(InitializeNewMinirun(&allocation_failed));
+      if (allocation_failed) {
+        break;
+      }
+      int processed = 0;
+      RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, start_index, 
&processed,
+          &allocation_failed));
+      start_index += processed;
+      *num_processed += processed;
+    }
+  }
+
   // Clear any temporary allocations made while materializing the sort tuples.
   expr_results_pool_.Clear();
   return Status::OK();
@@ -1096,6 +1234,45 @@ Status Sorter::InputDone() {
   // Sort the tuples in the last run.
   RETURN_IF_ERROR(SortCurrentInputRun());
 
+  if (inmem_run_max_pages_ > 0) {
+    sorted_inmem_runs_.push_back(unsorted_run_);
+    unsorted_run_ = nullptr;
+    if (!HasSpilledRuns()) {
+      if (sorted_inmem_runs_.size() == 1) {
+        DCHECK(sorted_inmem_runs_.back()->is_pinned());
+        DCHECK(merge_output_run_ == nullptr);
+        RETURN_IF_ERROR(sorted_inmem_runs_.back()->PrepareRead());
+        return Status::OK();
+      }
+      if (enable_spilling_) {
+        DCHECK(merge_output_run_ != nullptr);
+        merge_output_run_->CloseAllPages();
+        merge_output_run_ = nullptr;
+      }
+      // 'merge_output_run_' is not initialized for partial sort, because the 
output
+      // will be read directly from the merger.
+      DCHECK(enable_spilling_ || merge_output_run_ == nullptr);
+      return CreateMerger(sorted_inmem_runs_.size(), false);
+    }
+    DCHECK(enable_spilling_);
+
+    if (sorted_inmem_runs_.size() == 1) {
+      sorted_runs_.push_back(sorted_inmem_runs_.back());
+      sorted_inmem_runs_.clear();
+      DCHECK_GT(sorted_runs_.back()->run_size(), 0);
+      RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
+    } else {
+      RETURN_IF_ERROR(MergeInMemoryRuns());
+    }
+    DCHECK(sorted_inmem_runs_.empty());
+    // Merge intermediate runs until we have a final merge set-up.
+    return MergeIntermediateRuns();
+  } else {
+    sorted_runs_.push_back(unsorted_run_);
+  }
+
+  unsorted_run_ = nullptr;
+
   if (sorted_runs_.size() == 1) {
     // The entire input fit in one run. Read sorted rows in GetNext() directly 
from the
     // in-memory sorted run.
@@ -1114,10 +1291,19 @@ Status Sorter::InputDone() {
   return MergeIntermediateRuns();
 }
 
+
 Status Sorter::GetNext(RowBatch* output_batch, bool* eos) {
-  if (sorted_runs_.size() == 1) {
+  if (sorted_inmem_runs_.size() == 1 && !HasSpilledRuns()) {
+    DCHECK(sorted_inmem_runs_.back()->is_pinned());
+    return sorted_inmem_runs_.back()->GetNext<false>(output_batch, eos);
+  } else if (inmem_run_max_pages_ == 0 && sorted_runs_.size() == 1) {
     DCHECK(sorted_runs_.back()->is_pinned());
     return sorted_runs_.back()->GetNext<false>(output_batch, eos);
+  } else if (inmem_run_max_pages_ > 0 && !HasSpilledRuns()) {
+    RETURN_IF_ERROR(merger_->GetNext(output_batch, eos));
+    // Clear any temporary allocations made by the merger.
+    expr_results_pool_.Clear();
+    return Status::OK();
   } else {
     RETURN_IF_ERROR(merger_->GetNext(output_batch, eos));
     // Clear any temporary allocations made by the merger.
@@ -1144,6 +1330,7 @@ void Sorter::Close(RuntimeState* state) {
 }
 
 void Sorter::CleanupAllRuns() {
+  Run::CleanupRuns(&sorted_inmem_runs_);
   Run::CleanupRuns(&sorted_runs_);
   Run::CleanupRuns(&merging_runs_);
   if (unsorted_run_ != nullptr) unsorted_run_->CloseAllPages();
@@ -1160,10 +1347,8 @@ Status Sorter::SortCurrentInputRun() {
     SCOPED_TIMER(in_mem_sort_timer_);
     RETURN_IF_ERROR(in_mem_tuple_sorter_->Sort(unsorted_run_));
   }
-  sorted_runs_.push_back(unsorted_run_);
   sorted_data_size_->Add(unsorted_run_->TotalBytes());
   run_sizes_->UpdateCounter(unsorted_run_->num_tuples());
-  unsorted_run_ = nullptr;
 
   RETURN_IF_CANCELLED(state_);
   return Status::OK();
@@ -1236,8 +1421,31 @@ int Sorter::GetNumOfRunsForMerge() const {
   return max_runs_in_next_merge;
 }
 
+Status Sorter::MergeInMemoryRuns() {
+  DCHECK_GE(sorted_inmem_runs_.size(), 2);
+  DCHECK_GT(sorted_inmem_runs_.back()->run_size(), 0);
+
+  // No need to allocate more memory before doing in-memory merges, because the
+  // buffers of the in-memory runs are already open and they fit into memory.
+  // The merge output run is already initialized, too.
+
+  DCHECK(merge_output_run_ != nullptr) << "Should have initialized output run 
for merge.";
+  RETURN_IF_ERROR(CreateMerger(sorted_inmem_runs_.size(), false));
+  {
+    SCOPED_TIMER(in_mem_merge_timer_);
+    RETURN_IF_ERROR(ExecuteIntermediateMerge(merge_output_run_));
+  }
+  spilled_runs_counter_->Add(1);
+  sorted_runs_.push_back(merge_output_run_);
+  DCHECK_GT(sorted_runs_.back()->fixed_len_size(), 0);
+  merge_output_run_ = nullptr;
+  DCHECK(sorted_inmem_runs_.empty());
+  return Status::OK();
+}
+
 Status Sorter::MergeIntermediateRuns() {
   DCHECK_GE(sorted_runs_.size(), 2);
+  DCHECK_GT(sorted_runs_.back()->fixed_len_size(), 0);
 
   // Attempt to allocate more memory before doing intermediate merges. This may
   // be possible if other operators have relinquished memory after the sort 
has built
@@ -1248,7 +1456,7 @@ Status Sorter::MergeIntermediateRuns() {
     int num_of_runs_to_merge = GetNumOfRunsForMerge();
 
     DCHECK(merge_output_run_ == nullptr) << "Should have finished previous 
merge.";
-    RETURN_IF_ERROR(CreateMerger(num_of_runs_to_merge));
+    RETURN_IF_ERROR(CreateMerger(num_of_runs_to_merge, true));
 
     // If CreateMerger() consumed all the sorted runs, we have set up the 
final merge.
     if (sorted_runs_.empty()) return Status::OK();
@@ -1263,9 +1471,16 @@ Status Sorter::MergeIntermediateRuns() {
   return Status::OK();
 }
 
-Status Sorter::CreateMerger(int num_runs) {
+Status Sorter::CreateMerger(int num_runs, bool external) {
+  std::deque<impala::Sorter::Run *>* runs_to_merge;
+
+  if (external) {
+    DCHECK_GE(sorted_runs_.size(), 2);
+    runs_to_merge = &sorted_runs_;
+  } else {
+    runs_to_merge = &sorted_inmem_runs_;
+  }
   DCHECK_GE(num_runs, 2);
-  DCHECK_GE(sorted_runs_.size(), 2);
   // Clean up the runs from the previous merge.
   Run::CleanupRuns(&merging_runs_);
 
@@ -1273,24 +1488,25 @@ Status Sorter::CreateMerger(int num_runs) {
   // from the runs being merged. This is unnecessary overhead that is not 
required if we
   // correctly transfer resources.
   merger_.reset(
-      new SortedRunMerger(*compare_less_than_, output_row_desc_, profile_, 
true,
+      new SortedRunMerger(*compare_less_than_, output_row_desc_, profile_, 
external,
           codegend_heapify_helper_fn_));
 
   vector<function<Status (RowBatch**)>> merge_runs;
   merge_runs.reserve(num_runs);
   for (int i = 0; i < num_runs; ++i) {
-    Run* run = sorted_runs_.front();
+    Run* run = runs_to_merge->front();
     RETURN_IF_ERROR(run->PrepareRead());
 
     // Run::GetNextBatch() is used by the merger to retrieve a batch of rows 
to merge
     // from this run.
     merge_runs.emplace_back(bind<Status>(mem_fn(&Run::GetNextBatch), run, _1));
-    sorted_runs_.pop_front();
+    runs_to_merge->pop_front();
     merging_runs_.push_back(run);
   }
   RETURN_IF_ERROR(merger_->Prepare(merge_runs));
-
-  num_merges_counter_->Add(1);
+  if (external) {
+    num_merges_counter_->Add(1);
+  }
   return Status::OK();
 }
 
diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h
index c83bf81cc..39617e8e3 100644
--- a/be/src/runtime/sorter.h
+++ b/be/src/runtime/sorter.h
@@ -105,7 +105,7 @@ class Sorter {
   /// 'enable_spilling' should be set to false to reduce the number of 
requested buffers
   /// if the caller will use AddBatchNoSpill().
   /// 'codegend_sort_helper_fn' is a reference to the codegen version of
-  /// the Sorter::TupleSorter::SortHelp() method.
+  /// the Sorter::TupleSorter::SortHelper() method.
   /// 'estimated_input_size' is the total rows in bytes that are estimated to 
get added
   /// into this sorter. This is used to decide if sorter needs to proactively 
spill for
   /// the first run. -1 value means estimate is unavailable.
@@ -166,20 +166,39 @@ class Sorter {
   /// Return true if the sorter has any spilled runs.
   bool HasSpilledRuns() const;
 
+  /// The logic in AddBatchNoSpill() that handles the different cases when
+  /// AddBatchInternal() returns without processing the entire batch.
+  /// In this case we need to initialize a new minirun for the incoming rows.
+  /// Tries initializing a new minirun. If the initialization was successful,
+  /// allocation_failed is set to false, otherwise (e.g. a memory limit is 
hit) true.
+  Status InitializeNewMinirun(bool* allocation_failed) WARN_UNUSED_RESULT;
+
  private:
   class Page;
   class Run;
 
-  /// Minimum value for sot_run_bytes_limit query option.
+  /// Minimum value for sort_run_bytes_limit query option.
   static const int64_t MIN_SORT_RUN_BYTES_LIMIT = 32 << 20; // 32 MB
 
+  /// Merges multiple smaller runs in sorted_inmem_runs_ into a single larger 
merged
+  /// run that can be spilled page by page during the process, until all pages 
from
+  /// sorted_inmem_runs are consumed and freed.
+  /// Always performs one-level merge and spills the output run to disc. 
Therefore there
+  /// is no need to deep-copy the input, since the pages containing var-len 
data
+  /// remain in the memory until the end of the in-memory merge.
+  /// TODO: delete pages that are consumed during merge asap.
+  Status MergeInMemoryRuns() WARN_UNUSED_RESULT;
+
   /// Create a SortedRunMerger from sorted runs in 'sorted_runs_' and assign 
it to
   /// 'merger_'. 'num_runs' indicates how many runs should be covered by the 
current
-  /// merging attempt. Returns error if memory allocation fails during in
-  /// Run::PrepareRead(). The runs to be merged are removed from 
'sorted_runs_'. The
+  /// merging attempt. Returns error if memory allocation fails in 
Run::PrepareRead().
+  /// The runs to be merged are removed from 'sorted_runs_'.
+  /// If 'external' is set to true, it performs an external merge, and the
   /// Sorter sets the 'deep_copy_input' flag to true for the merger, since the 
pages
   /// containing input run data will be deleted as input runs are read.
-  Status CreateMerger(int num_runs) WARN_UNUSED_RESULT;
+  /// If 'external' is false, it creates an in-memory merger for the in-memory 
miniruns.
+  /// In this case, 'deep_copy_input' is set to false.
+  Status CreateMerger(int num_runs, bool external) WARN_UNUSED_RESULT;
 
   /// Repeatedly replaces multiple smaller runs in sorted_runs_ with a single 
larger
   /// merged run until there are few enough runs to be merged with a single 
merger.
@@ -193,6 +212,10 @@ class Sorter {
   /// and adding them to 'merged_run'.
   Status ExecuteIntermediateMerge(Sorter::Run* merged_run) WARN_UNUSED_RESULT;
 
+  /// Handles cases in AddBatch where a memory limit is reached and spilling 
must start.
+  /// Used only in case of multiple in-memory runs set by query option 
MAX_SORT_RUN_SIZE.
+  inline Status MergeAndSpill() WARN_UNUSED_RESULT;
+
   /// Called once there no more rows to be added to 'unsorted_run_'. Sorts
   /// 'unsorted_run_' and appends it to the list of sorted runs.
   Status SortCurrentInputRun() WARN_UNUSED_RESULT;
@@ -266,8 +289,8 @@ class Sorter {
   const CodegenFnPtr<SortedRunMerger::HeapifyHelperFn>& 
codegend_heapify_helper_fn_;
 
   /// A default codegened function pointer storing nullptr, which is used when 
the
-  /// merger is not needed. Used as a default value in constructor, when the 
CodegenFnPtr
-  /// is not provided.
+  /// merger is not needed. Used as a default value in the constructor, when 
the
+  /// CodegenFnPtr is not provided.
   static const CodegenFnPtr<SortedRunMerger::HeapifyHelperFn> 
default_heapify_helper_fn_;
 
   /// Client used to allocate pages from the buffer pool. Not owned.
@@ -302,9 +325,13 @@ class Sorter {
   /// When it is added to sorted_runs_, it is set to NULL.
   Run* unsorted_run_;
 
-  /// List of sorted runs that have been produced but not merged. 
unsorted_run_ is added
-  /// to this list after an in-memory sort. Sorted runs produced by 
intermediate merges
-  /// are also added to this list during the merge. Runs are added to the 
object pool.
+  /// List of quicksorted miniruns before merging in memory.
+  std::deque<Run*> sorted_inmem_runs_;
+
+  /// List of sorted runs that have been produced but not merged. 
'unsorted_run_' is
+  /// added to this list after an in-memory sort. Sorted runs produced by 
intermediate
+  /// merges are also added to this list during the merge. Runs are added to 
the
+  /// object pool.
   std::deque<Run*> sorted_runs_;
 
   /// Merger object (intermediate or final) currently used to produce sorted 
runs.
@@ -345,6 +372,9 @@ class Sorter {
   /// Time spent sorting initial runs in memory.
   RuntimeProfile::Counter* in_mem_sort_timer_;
 
+  /// Time spent merging initial miniruns in memory.
+  RuntimeProfile::Counter* in_mem_merge_timer_;
+
   /// Total size of the initial runs in bytes.
   RuntimeProfile::Counter* sorted_data_size_;
 
@@ -353,6 +383,12 @@ class Sorter {
 
   /// Flag to enforce sort_run_bytes_limit.
   bool enforce_sort_run_bytes_limit_ = false;
+
+  /// Maximum number of fixed-length + variable-length pages in an in-memory 
run set by
+  /// Query Option MAX_SORT_RUN_SIZE.
+  /// The default value is 0 which means that only 1 in-memory run will be 
created, and
+  /// its size will be determined by other limits eg. memory or 
sort_run_bytes_limit.
+  int inmem_run_max_pages_ = 0;
 };
 
 } // namespace impala
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index da959234d..af0a74463 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1099,6 +1099,15 @@ Status impala::SetQueryOption(const string& key, const 
string& value,
         query_options->__set_max_fragment_instances_per_node(max_num);
         break;
       }
+      case TImpalaQueryOptions::MAX_SORT_RUN_SIZE: {
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>(
+            option, value, &int32_t_val));
+        RETURN_IF_ERROR(
+            QueryOptionValidator<int32_t>::NotEquals(option, int32_t_val, 1));
+        query_options->__set_max_sort_run_size(int32_t_val);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << 
key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index db47664ec..f88e5fc74 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -50,7 +50,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE                                                       
          \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),                       
          \
-      TImpalaQueryOptions::MAX_FRAGMENT_INSTANCES_PER_NODE + 1);               
          \
+      TImpalaQueryOptions::MAX_SORT_RUN_SIZE + 1);                             
          \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, 
ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)     
          \
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)   
          \
@@ -291,7 +291,8 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(join_selectivity_correlation_factor, 
JOIN_SELECTIVITY_CORRELATION_FACTOR, \
       TQueryOptionLevel::ADVANCED)                                             
          \
   QUERY_OPT_FN(max_fragment_instances_per_node, 
MAX_FRAGMENT_INSTANCES_PER_NODE,         \
-      TQueryOptionLevel::ADVANCED);
+      TQueryOptionLevel::ADVANCED);                                            
          \
+  QUERY_OPT_FN(max_sort_run_size, MAX_SORT_RUN_SIZE, 
TQueryOptionLevel::DEVELOPMENT)     ;
 
 /// Enforce practical limits on some query options to avoid undesired query 
state.
 static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB
diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h
index c17253cc3..70d59d2c9 100644
--- a/be/src/util/tuple-row-compare.h
+++ b/be/src/util/tuple-row-compare.h
@@ -59,6 +59,8 @@ class ComparatorWrapper {
   }
 };
 
+class TupleRowComparator;
+
 /// TupleRowComparatorConfig contains the static state initialized from its 
corresponding
 /// thrift structure. It serves as an input for creating instances of the
 /// TupleRowComparator class.
@@ -102,8 +104,8 @@ class TupleRowComparatorConfig {
   std::vector<int8_t> nulls_first_;
 
   /// Codegened version of TupleRowComparator::Compare().
-  typedef int (*CompareFn)(ScalarExprEvaluator* const*, ScalarExprEvaluator* 
const*,
-      const TupleRow*, const TupleRow*);
+  typedef int (*CompareFn)(const TupleRowComparator*, ScalarExprEvaluator* 
const*,
+      ScalarExprEvaluator* const*, const TupleRow*, const TupleRow*);
   CodegenFnPtr<CompareFn> codegend_compare_fn_;
 
  private:
@@ -156,8 +158,19 @@ class TupleRowComparator {
   /// hot loops.
   bool ALWAYS_INLINE Less(const TupleRow* lhs, const TupleRow* rhs) const {
     return Compare(
-               ordering_expr_evals_lhs_.data(), 
ordering_expr_evals_rhs_.data(), lhs, rhs)
-        < 0;
+        ordering_expr_evals_lhs_.data(), ordering_expr_evals_rhs_.data(), lhs, 
rhs) < 0;
+  }
+
+  bool ALWAYS_INLINE LessCodegend(const TupleRow* lhs, const TupleRow* rhs) 
const {
+    if (codegend_compare_fn_non_atomic_ != nullptr) {
+      return codegend_compare_fn_non_atomic_(this,
+          ordering_expr_evals_lhs_.data(), ordering_expr_evals_rhs_.data(), 
lhs, rhs) < 0;
+    } else {
+      TupleRowComparatorConfig::CompareFn fn = codegend_compare_fn_.load();
+      if (fn != nullptr) codegend_compare_fn_non_atomic_ = fn;
+    }
+    return Compare(
+        ordering_expr_evals_lhs_.data(), ordering_expr_evals_rhs_.data(), lhs, 
rhs) < 0;
   }
 
   bool ALWAYS_INLINE Less(const Tuple* lhs, const Tuple* rhs) const {
@@ -197,6 +210,7 @@ class TupleRowComparator {
   /// Reference to the codegened function pointer owned by the 
TupleRowComparatorConfig
   /// object that was used to create this instance.
   const CodegenFnPtr<TupleRowComparatorConfig::CompareFn>& 
codegend_compare_fn_;
+  mutable TupleRowComparatorConfig::CompareFn codegend_compare_fn_non_atomic_ 
= nullptr;
 
  private:
   /// Interpreted implementation of Compare().
diff --git a/bin/perf_tools/perf-query.sh b/bin/perf_tools/perf-query.sh
index 8e44491d6..3e6ace7d1 100755
--- a/bin/perf_tools/perf-query.sh
+++ b/bin/perf_tools/perf-query.sh
@@ -64,7 +64,7 @@ sudo echo "test sudo"
 sudo perf record -F 99 -g -a &
 perf_pid=$!
 
-~/Impala/bin/impala-shell.sh -q "$1"
+${IMPALA_HOME}/bin/impala-shell.sh -q "$1"
 
 # Send interrupt to 'perf record'. We need to issue 'kill' in a new 
session/process
 # group via 'setsid', otherwise 'perf record' won't get the signal (because 
it's
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index e501d9cc2..26651a00f 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -785,6 +785,16 @@ enum TImpalaQueryOptions {
   // PROCESSING_COST_MIN_THREADS option has higher value.
   // Valid values are in [1, 128]. Default to 128.
   MAX_FRAGMENT_INSTANCES_PER_NODE = 156
+
+  // Configures the in-memory sort algorithm used in the sorter. Determines the
+  // maximum number of pages in an initial in-memory run (fixed + variable 
length).
+  // 0 means unlimited, which will create 1 big run with no in-memory merge 
phase.
+  // Setting any other other value can create multiple miniruns which leads to 
an
+  // in-memory merge phase. The minimum value in that case is 2.
+  // Generally, with larger workloads the recommended value is 10 or more to 
avoid
+  // high fragmentation of variable length data.
+  MAX_SORT_RUN_SIZE = 157;
+
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index f482d21b5..2e0157a96 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -614,6 +614,7 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   150: optional bool disable_codegen_cache = false;
+
   151: optional TCodeGenCacheMode codegen_cache_mode = 
TCodeGenCacheMode.NORMAL;
 
   // See comment in ImpalaService.thrift
@@ -633,6 +634,10 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   157: optional i32 max_fragment_instances_per_node = 
MAX_FRAGMENT_INSTANCES_PER_NODE
+
+  // Configures the in-memory sort algorithm used in the sorter.
+  // See comment in ImpalaService.thrift
+  158: optional i32 max_sort_run_size = 0;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and 
external
diff --git a/tests/query_test/test_sort.py b/tests/query_test/test_sort.py
index e536b0134..82e43a78f 100644
--- a/tests/query_test/test_sort.py
+++ b/tests/query_test/test_sort.py
@@ -21,6 +21,10 @@ from copy import copy, deepcopy
 
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfNotHdfsMinicluster
+from tests.common.test_vector import ImpalaTestDimension
+
+# Run sizes (number of pages per run) in sorter
+MAX_SORT_RUN_SIZE = [0, 2, 20]
 
 
 def split_result_rows(result):
@@ -55,6 +59,8 @@ class TestQueryFullSort(ImpalaTestSuite):
   @classmethod
   def add_test_dimensions(cls):
     super(TestQueryFullSort, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('max_sort_run_size',
+        *MAX_SORT_RUN_SIZE))
 
     if cls.exploration_strategy() == 'core':
       cls.ImpalaTestMatrix.add_constraint(lambda v:\
@@ -229,10 +235,21 @@ class TestQueryFullSort(ImpalaTestSuite):
 class TestRandomSort(ImpalaTestSuite):
   @classmethod
   def get_workload(self):
-    return 'functional'
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestRandomSort, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('max_sort_run_size',
+        *MAX_SORT_RUN_SIZE))
+
+    if cls.exploration_strategy() == 'core':
+      cls.ImpalaTestMatrix.add_constraint(lambda v:
+          v.get_value('table_format').file_format == 'parquet')
 
-  def test_order_by_random(self):
+  def test_order_by_random(self, vector):
     """Tests that 'order by random()' works as expected."""
+    exec_option = copy(vector.get_value('exec_option'))
     # "order by random()" with different seeds should produce different 
orderings.
     seed_query = "select * from functional.alltypestiny order by random(%s)"
     results_seed0 = self.execute_query(seed_query % "0")
@@ -242,8 +259,8 @@ class TestRandomSort(ImpalaTestSuite):
 
     # Include "random()" in the select list to check that it's sorted 
correctly.
     results = transpose_results(self.execute_query(
-        "select random() as r from functional.alltypessmall order by r").data,
-        lambda x: float(x))
+        "select random() as r from functional.alltypessmall order by r",
+        exec_option).data, lambda x: float(x))
     assert(results[0] == sorted(results[0]))
 
     # Like above, but with a limit.
@@ -254,22 +271,40 @@ class TestRandomSort(ImpalaTestSuite):
 
     # "order by random()" inside an inline view.
     query = "select r from (select random() r from functional.alltypessmall) v 
order by r"
-    results = transpose_results(self.execute_query(query).data, lambda x: 
float(x))
+    results = transpose_results(self.execute_query(query, exec_option).data,
+        lambda x: float(x))
     assert (results == sorted(results))
 
-  def test_analytic_order_by_random(self):
+  def test_analytic_order_by_random(self, vector):
     """Tests that a window function over 'order by random()' works as 
expected."""
+    exec_option = copy(vector.get_value('exec_option'))
     # Since we use the same random seed, the results should be returned in 
order.
     query = """select last_value(rand(2)) over (order by rand(2)) from
       functional.alltypestiny"""
-    results = transpose_results(self.execute_query(query).data, lambda x: 
float(x))
+    results = transpose_results(self.execute_query(query, exec_option).data,
+        lambda x: float(x))
     assert (results == sorted(results))
 
 
+
 class TestPartialSort(ImpalaTestSuite):
   """Test class to do functional validation of partial sorts."""
 
-  def test_partial_sort_min_reservation(self, unique_database):
+  @classmethod
+  def get_workload(self):
+    return 'tpch'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestPartialSort, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('max_sort_run_size',
+        *MAX_SORT_RUN_SIZE))
+
+    if cls.exploration_strategy() == 'core':
+      cls.ImpalaTestMatrix.add_constraint(lambda v:
+          v.get_value('table_format').file_format == 'parquet')
+
+  def test_partial_sort_min_reservation(self, vector, unique_database):
     """Test that the partial sort node can operate if it only gets its minimum
     memory reservation."""
     table_name = "%s.kudu_test" % unique_database
@@ -277,10 +312,36 @@ class TestPartialSort(ImpalaTestSuite):
         "debug_action", "-1:OPEN:[email protected]")
     self.execute_query("""create table %s (col0 string primary key)
         partition by hash(col0) partitions 8 stored as kudu""" % table_name)
+    exec_option = copy(vector.get_value('exec_option'))
     result = self.execute_query(
-        "insert into %s select string_col from functional.alltypessmall" % 
table_name)
+        "insert into %s select string_col from functional.alltypessmall" % 
table_name,
+        exec_option)
     assert "PARTIAL SORT" in result.runtime_profile, result.runtime_profile
 
+  def test_partial_sort_kudu_insert(self, vector, unique_database):
+    table_name = "%s.kudu_partial_sort_test" % unique_database
+    self.execute_query("""create table %s (l_linenumber INT, l_orderkey BIGINT,
+      l_partkey BIGINT, l_shipdate STRING, l_quantity DECIMAL(12,2),
+      l_comment STRING, PRIMARY KEY(l_linenumber, l_orderkey) )
+      PARTITION BY RANGE (l_linenumber)
+      (
+        PARTITION VALUE = 1,
+        PARTITION VALUE = 2,
+        PARTITION VALUE = 3,
+        PARTITION VALUE = 4,
+        PARTITION VALUE = 5,
+        PARTITION VALUE = 6,
+        PARTITION VALUE = 7
+      )
+      STORED AS KUDU""" % table_name)
+    exec_option = copy(vector.get_value('exec_option'))
+    result = self.execute_query(
+        """insert into %s SELECT l_linenumber, l_orderkey, l_partkey, 
l_shipdate,
+        l_quantity, l_comment FROM tpch.lineitem limit 300000""" % table_name,
+        exec_option)
+    assert "NumModifiedRows: 300000" in result.runtime_profile, 
result.runtime_profile
+    assert "NumRowErrors: 0" in result.runtime_profile, result.runtime_profile
+
 
 class TestArraySort(ImpalaTestSuite):
   """Tests where there are arrays in the sorting tuple."""
@@ -292,7 +353,8 @@ class TestArraySort(ImpalaTestSuite):
   @classmethod
   def add_test_dimensions(cls):
     super(TestArraySort, cls).add_test_dimensions()
-
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('max_sort_run_size',
+        *MAX_SORT_RUN_SIZE))
     # The table we use is a parquet table.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
         v.get_value('table_format').file_format == 'parquet')


Reply via email to