IMPALA-1346/1590/2344: fix sorter buffer mgmt when spilling

The Sorter's memory management logic failed to correctly manage buffers
when spilling. It would try to make use of all buffers in the system,
neglecting to account for other operators' buffer usage.

This patch adjusts the logic so that it handles contention for buffers
so long as it can get enough buffers to make progress. Instead of
precalculating the number of buffers it thinks it should be able to
pin, it just makes a best-effort attempt to pin the initial buffers
as many runs as possible, up to a limit. As long as it can pin three
runs, it can make progress.

Testing:
Added an additional test that failed before the patch without OOM.
An analytic function test that was meant to fail also started succeeding
so I had to adjust the limit there too.

Change-Id: Idfe55cc13c7f2b54cba1d05ade44cbcf6bb573c0
Reviewed-on: http://gerrit.cloudera.org:8080/2908
Reviewed-by: Dan Hecht <[email protected]>
Tested-by: Tim Armstrong <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ee53ddb3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ee53ddb3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ee53ddb3

Branch: refs/heads/master
Commit: ee53ddb389549247f5bfe760d446dc7b3b963a29
Parents: 37ec253
Author: Tim Armstrong <[email protected]>
Authored: Wed Apr 20 01:11:27 2016 -0700
Committer: Tim Armstrong <[email protected]>
Committed: Mon Jun 6 17:34:07 2016 -0700

----------------------------------------------------------------------
 be/src/runtime/buffered-block-mgr.h             |   6 -
 be/src/runtime/sorter.cc                        | 228 +++++++++++--------
 be/src/runtime/sorter.h                         |  24 +-
 .../queries/QueryTest/analytic-fns.test         |   7 +-
 .../queries/QueryTest/spilling.test             |  39 ++++
 5 files changed, 188 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee53ddb3/be/src/runtime/buffered-block-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.h 
b/be/src/runtime/buffered-block-mgr.h
index 9e54c4f..78fdb37 100644
--- a/be/src/runtime/buffered-block-mgr.h
+++ b/be/src/runtime/buffered-block-mgr.h
@@ -389,12 +389,6 @@ class BufferedBlockMgr {
   /// only for error reporting.
   Status MemLimitTooLowError(Client* client, int node_id);
 
-  /// TODO: Remove these two. Not clear what the sorter really needs.
-  /// TODO: Those are dirty, dangerous reads to two lists whose all other 
accesses are
-  /// protected by the lock_. Using those two functions is looking for trouble.
-  int available_allocated_buffers() const { return all_io_buffers_.size(); }
-  int num_free_buffers() const { return free_io_buffers_.size(); }
-
   int num_pinned_buffers(Client* client) const;
   int num_reserved_buffers_remaining(Client* client) const;
   MemTracker* get_tracker(Client* client) const;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee53ddb3/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index c242b6b..2e9e6ec 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -32,20 +32,26 @@ using namespace strings;
 
 namespace impala {
 
-// Number of pinned blocks required for a merge.
-const int BLOCKS_REQUIRED_FOR_MERGE = 3;
+// Number of pinned blocks required for a merge with fixed-length data only.
+const int MIN_BUFFERS_PER_MERGE = 3;
 
-// Error message when pinning fixed or variable length blocks failed.
-// TODO: Add the node id that iniated the sort
-const string PIN_FAILED_ERROR_MSG = "Failed to pin block for $0-length data 
needed "
-    "for sorting. Reducing query concurrency or increasing the memory limit 
may help "
-    "this query to complete successfully.";
+// Maximum number of buffers to use in each merge to prevent sorter trying to 
grab
+// all the memory when spilling. Given 8mb buffers, this limits the sorter to 
using
+// 1GB of buffers when merging.
+// TODO: this is an arbitrary limit. Once we have reliable reservations 
(IMPALA-3200)
+// we should base this on the number of reservations.
+const int MAX_BUFFERS_PER_MERGE = 128;
 
 const string MEM_ALLOC_FAILED_ERROR_MSG = "Failed to allocate block for 
$0-length "
     "data needed for sorting. Reducing query concurrency or increasing the "
     "memory limit may help this query to complete successfully.";
 
-/// Delete all non-null blocks in 'blocks' and clear vector.
+const string MERGE_FAILED_ERROR_MSG = "Failed to allocate block to merge 
spilled runs "
+    "during sorting. Only $0 runs could be merged, but must be able to merge 
at least 2 "
+    "to make progress. Reducing query concurrency or increasing the memory 
limit may "
+    "help this query to complete successfully.";
+
+/// Delete all non-null blocks in blocks and clear vector.
 static void DeleteAndClearBlocks(vector<BufferedBlockMgr::Block*>* blocks) {
   for (BufferedBlockMgr::Block* block: *blocks) {
     if (block != NULL) block->Delete();
@@ -135,8 +141,12 @@ class Sorter::Run {
   void DeleteAllBlocks();
 
   /// Prepare to read a sorted run. Pins the first block(s) in the run if the 
run was
-  /// previously unpinned.
-  Status PrepareRead();
+  /// previously unpinned. If the run was unpinned, try to pin the initial 
fixed and
+  /// var len blocks in the run. If it couldn't pin them, set 
pinned_all_blocks to false.
+  /// In that case, none or one of the initial blocks may be pinned and it is 
valid to
+  /// call PrepareRead() again to retry pinning the remainder. 
pinned_all_blocks is
+  /// always set to true if the run is pinned.
+  Status PrepareRead(bool* pinned_all_blocks);
 
   /// Interface for merger - get the next batch of rows from this run. This 
run still
   /// owns the returned batch. Calls GetNext(RowBatch*, bool*).
@@ -723,7 +733,7 @@ Status Sorter::Run::UnpinAllBlocks() {
   return Status::OK();
 }
 
-Status Sorter::Run::PrepareRead() {
+Status Sorter::Run::PrepareRead(bool* pinned_all_blocks) {
   DCHECK(is_finalized_);
   DCHECK(is_sorted_);
 
@@ -737,33 +747,32 @@ Status Sorter::Run::PrepareRead() {
       sorter_->state_->batch_size(), sorter_->mem_tracker_));
 
   // If the run is pinned, all blocks are already pinned, so we're ready to 
read.
-  if (is_pinned_) return Status::OK();
+  if (is_pinned_) {
+    *pinned_all_blocks = true;
+    return Status::OK();
+  }
 
   // Attempt to pin the first fixed and var-length blocks. In either case, 
pinning may
   // fail if the number of reserved blocks is oversubscribed, see IMPALA-1590.
   if (fixed_len_blocks_.size() > 0) {
     bool pinned;
     RETURN_IF_ERROR(fixed_len_blocks_[0]->Pin(&pinned));
-    // Temporary work-around for IMPALA-1868. Fail the query with OOM rather 
than
-    // DCHECK in case block pin fails.
     if (!pinned) {
-      Status status = Status::MemLimitExceeded();
-      status.AddDetail(Substitute(PIN_FAILED_ERROR_MSG, "fixed"));
-      return status;
+      *pinned_all_blocks = false;
+      return Status::OK();
     }
   }
 
   if (HasVarLenBlocks()) {
     bool pinned;
     RETURN_IF_ERROR(var_len_blocks_[0]->Pin(&pinned));
-    // Temporary work-around for IMPALA-1590. Fail the query with OOM rather 
than
-    // DCHECK in case block pin fails.
     if (!pinned) {
-      Status status = Status::MemLimitExceeded();
-      status.AddDetail(Substitute(PIN_FAILED_ERROR_MSG, "variable"));
-      return status;
+      *pinned_all_blocks = false;
+      return Status::OK();
     }
   }
+
+  *pinned_all_blocks = true;
   return Status::OK();
 }
 
@@ -1328,6 +1337,7 @@ Sorter::Sorter(const TupleRowComparator& 
compare_less_than,
     mem_tracker_(mem_tracker),
     output_row_desc_(output_row_desc),
     unsorted_run_(NULL),
+    merge_output_run_(NULL),
     profile_(profile),
     initial_runs_counter_(NULL),
     num_merges_counter_(NULL),
@@ -1339,6 +1349,7 @@ Sorter::~Sorter() {
   DCHECK(sorted_runs_.empty());
   DCHECK(merging_runs_.empty());
   DCHECK(unsorted_run_ == NULL);
+  DCHECK(merge_output_run_ == NULL);
 }
 
 Status Sorter::Init() {
@@ -1355,13 +1366,13 @@ Status Sorter::Init() {
   in_mem_sort_timer_ = ADD_TIMER(profile_, "InMemorySortTime");
   sorted_data_size_ = ADD_COUNTER(profile_, "SortDataSize", TUnit::BYTES);
 
-  int min_blocks_required = BLOCKS_REQUIRED_FOR_MERGE;
-  // Fixed and var-length blocks are separate, so we need 
BLOCKS_REQUIRED_FOR_MERGE
+  int min_buffers_required = MIN_BUFFERS_PER_MERGE;
+  // Fixed and var-length blocks are separate, so we need MIN_BUFFERS_PER_MERGE
   // blocks for both if there is var-length data.
-  if (has_var_len_slots_) min_blocks_required *= 2;
+  if (has_var_len_slots_) min_buffers_required *= 2;
 
   RETURN_IF_ERROR(block_mgr_->RegisterClient(Substitute("Sorter ptr=$0", this),
-      min_blocks_required, false, mem_tracker_, state_, &block_mgr_client_));
+      min_buffers_required, false, mem_tracker_, state_, &block_mgr_client_));
 
   DCHECK(unsorted_run_ != NULL);
   RETURN_IF_ERROR(unsorted_run_->Init());
@@ -1394,44 +1405,25 @@ Status Sorter::InputDone() {
   RETURN_IF_ERROR(SortCurrentInputRun());
 
   if (sorted_runs_.size() == 1) {
-    // The entire input fit in one run. Read sorted rows in GetNext() directly
-    // from the sorted run.
-    RETURN_IF_ERROR(sorted_runs_.back()->PrepareRead());
+    // The entire input fit in one run. Read sorted rows in GetNext() directly 
from the
+    // in-memory sorted run.
+    DCHECK(sorted_runs_.back()->is_pinned());
+    bool success;
+    RETURN_IF_ERROR(sorted_runs_.back()->PrepareRead(&success));
+    DCHECK(success) << "Should always be able to prepare pinned run for read.";
     return Status::OK();
   }
 
-  // At least one merge is necessary.
-  int blocks_per_run = has_var_len_slots_ ? 2 : 1;
-  int min_buffers_for_merge = sorted_runs_.size() * blocks_per_run;
-  // Check if the final run needs to be unpinned.
-  bool unpinned_final = false;
-  if (block_mgr_->num_free_buffers() < min_buffers_for_merge - blocks_per_run) 
{
-    // Number of available buffers is less than the size of the final run and
-    // the buffers needed to read the remainder of the runs in memory.
-    // Unpin the final run.
-    RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllBlocks());
-    unpinned_final = true;
-  } else {
-    // No need to unpin the current run. There is enough memory to stream the
-    // other runs.
-    // TODO: revisit. It might be better to unpin some from this run if it 
means
-    // we can get double buffering in the other runs.
-  }
+  // Unpin the final run to free up memory for the merge.
+  // TODO: we could keep it in memory in some circumstances as an 
optimisation, once
+  // we have a buffer pool with more reliable reservations (IMPALA-3200).
+  RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllBlocks());
 
-  // For an intermediate merge, intermediate_merge_batch contains deep-copied 
rows from
-  // the input runs. If (unmerged_sorted_runs_.size() > 
max_runs_per_final_merge),
-  // one or more intermediate merges are required.
+  // Merge intermediate runs until we have a final merge set-up.
   // TODO: Attempt to allocate more memory before doing intermediate merges. 
This may
   // be possible if other operators have relinquished memory after the sort 
has built
-  // its runs.
-  if (min_buffers_for_merge > block_mgr_->available_allocated_buffers()) {
-    DCHECK(unpinned_final);
-    RETURN_IF_ERROR(MergeIntermediateRuns());
-  }
-
-  // Create the final merger.
-  RETURN_IF_ERROR(CreateMerger(sorted_runs_.size()));
-  return Status::OK();
+  // its runs. This depends on more reliable reservations (IMPALA-3200)
+  return MergeIntermediateRuns();
 }
 
 Status Sorter::GetNext(RowBatch* output_batch, bool* eos) {
@@ -1444,7 +1436,7 @@ Status Sorter::GetNext(RowBatch* output_batch, bool* eos) 
{
 }
 
 Status Sorter::Reset() {
-  DCHECK(unsorted_run_ == NULL);
+  DCHECK(unsorted_run_ == NULL) << "Cannot Reset() before calling InputDone()";
   merger_.reset();
   // Free resources from the current runs.
   CleanupAllRuns();
@@ -1466,6 +1458,8 @@ void Sorter::CleanupAllRuns() {
   Run::CleanupRuns(&merging_runs_);
   if (unsorted_run_ != NULL) unsorted_run_->DeleteAllBlocks();
   unsorted_run_ = NULL;
+  if (merge_output_run_ != NULL) merge_output_run_->DeleteAllBlocks();
+  merge_output_run_ = NULL;
 }
 
 Status Sorter::SortCurrentInputRun() {
@@ -1484,55 +1478,52 @@ Status Sorter::SortCurrentInputRun() {
 }
 
 Status Sorter::MergeIntermediateRuns() {
-  int blocks_per_run = has_var_len_slots_ ? 2 : 1;
-  int max_runs_per_final_merge =
-      block_mgr_->available_allocated_buffers() / blocks_per_run;
+  DCHECK_GE(sorted_runs_.size(), 2);
+  int pinned_blocks_per_run = has_var_len_slots_ ? 2 : 1;
+  int max_runs_per_final_merge = MAX_BUFFERS_PER_MERGE / pinned_blocks_per_run;
 
-  // During an intermediate merge, blocks from the output sorted run will have 
to be
-  // pinned.
+  // During an intermediate merge, the one or two blocks from the output 
sorted run
+  // that are being written must be pinned.
   int max_runs_per_intermediate_merge = max_runs_per_final_merge - 1;
   DCHECK_GT(max_runs_per_intermediate_merge, 1);
-  // For an intermediate merge, intermediate_merge_batch contains deep-copied 
rows from
-  // the input runs. If (sorted_runs_.size() > max_runs_per_final_merge),
-  // one or more intermediate merges are required.
-  scoped_ptr<RowBatch> intermediate_merge_batch;
-  while (sorted_runs_.size() > max_runs_per_final_merge) {
+
+  while (true) {
     // An intermediate merge adds one merge to unmerged_sorted_runs_.
-    // Merging 'runs - (max_runs_final_ - 1)' number of runs is sufficient to 
guarantee
-    // that the final merge can be performed.
-    int num_runs_to_merge = min<int>(max_runs_per_intermediate_merge,
-        sorted_runs_.size() - max_runs_per_intermediate_merge);
-    RETURN_IF_ERROR(CreateMerger(num_runs_to_merge));
-    RowBatch intermediate_merge_batch(*output_row_desc_, state_->batch_size(),
-        mem_tracker_);
-    // 'merged_run' is the new sorted run that is produced by the intermediate 
merge.
-    // We added 'merged_run' to 'sorted_runs_' immediately so that it is 
cleaned up
-    // in Close().
-    Run* merged_run = obj_pool_.Add(
+    // TODO: once we have reliable reservations (IMPALA-3200), we should 
calculate this
+    // based on the available reservations.
+    int num_runs_to_merge =
+        min<int>(max_runs_per_intermediate_merge, sorted_runs_.size());
+
+    DCHECK(merge_output_run_ == NULL) << "Should have finished previous 
merge.";
+    // Create the merged run in case we need to do intermediate merges. We 
need the
+    // output run and at least two input runs in memory to make progress on the
+    // intermediate merges.
+    // TODO: this isn't optimal: we could defer creating the merged run if we 
have
+    // reliable reservations (IMPALA-3200).
+    merge_output_run_ = obj_pool_.Add(
         new Run(this, output_row_desc_->tuple_descriptors()[0], false));
-    sorted_runs_.push_back(merged_run);
-    RETURN_IF_ERROR(merged_run->Init());
-    bool eos = false;
-    while (!eos) {
-      // Copy rows into the new run until done.
-      int num_copied;
-      RETURN_IF_CANCELLED(state_);
-      RETURN_IF_ERROR(merger_->GetNext(&intermediate_merge_batch, &eos));
-      RETURN_IF_ERROR(
-          merged_run->AddIntermediateBatch(&intermediate_merge_batch, 0, 
&num_copied));
-
-      DCHECK_EQ(num_copied, intermediate_merge_batch.num_rows());
-      intermediate_merge_batch.Reset();
-    }
+    RETURN_IF_ERROR(merge_output_run_->Init());
+    RETURN_IF_ERROR(CreateMerger(num_runs_to_merge));
 
-    RETURN_IF_ERROR(merged_run->FinalizeInput());
+    // If CreateMerger() consumed all the sorted runs, we have set up the 
final merge.
+    if (sorted_runs_.empty()) {
+      // Don't need intermediate run for final merge.
+      if (merge_output_run_ != NULL) {
+        merge_output_run_->DeleteAllBlocks();
+        merge_output_run_ = NULL;
+      }
+      return Status::OK();
+    }
+    RETURN_IF_ERROR(ExecuteIntermediateMerge(merge_output_run_));
+    sorted_runs_.push_back(merge_output_run_);
+    merge_output_run_ = NULL;
   }
-
   return Status::OK();
 }
 
-Status Sorter::CreateMerger(int num_runs) {
-  DCHECK_GT(num_runs, 1);
+Status Sorter::CreateMerger(int max_num_runs) {
+  DCHECK_GE(max_num_runs, 2);
+  DCHECK_GE(sorted_runs_.size(), 2);
   // Clean up the runs from the previous merge.
   Run::CleanupRuns(&merging_runs_);
 
@@ -1543,10 +1534,24 @@ Status Sorter::CreateMerger(int num_runs) {
       new SortedRunMerger(compare_less_than_, output_row_desc_, profile_, 
true));
 
   vector<function<Status (RowBatch**)> > merge_runs;
-  merge_runs.reserve(num_runs);
-  for (int i = 0; i < num_runs; ++i) {
+  merge_runs.reserve(max_num_runs);
+  for (int i = 0; i < max_num_runs; ++i) {
     Run* run = sorted_runs_.front();
-    RETURN_IF_ERROR(run->PrepareRead());
+    bool success;
+    RETURN_IF_ERROR(run->PrepareRead(&success));
+    if (!success) {
+      // If we can merge at least two runs, we can continue, otherwise we have 
a problem
+      // because we can't make progress on the merging.
+      // TODO: IMPALA-3200: we should not need this logic once we have reliable
+      // reservations (IMPALA-3200).
+      if (merging_runs_.size() < 2) {
+        Status status = Status::MemLimitExceeded();
+        status.AddDetail(Substitute(MERGE_FAILED_ERROR_MSG, 
merging_runs_.size()));
+        return status;
+      }
+      // Merge the runs that we were able to prepare.
+      break;
+    }
     // Run::GetNextBatch() is used by the merger to retrieve a batch of rows 
to merge
     // from this run.
     merge_runs.push_back(bind<Status>(mem_fn(&Run::GetNextBatch), run, _1));
@@ -1559,4 +1564,25 @@ Status Sorter::CreateMerger(int num_runs) {
   return Status::OK();
 }
 
+
+Status Sorter::ExecuteIntermediateMerge(Sorter::Run* merged_run) {
+  RowBatch intermediate_merge_batch(*output_row_desc_, state_->batch_size(),
+      mem_tracker_);
+  bool eos = false;
+  while (!eos) {
+    // Copy rows into the new run until done.
+    int num_copied;
+    RETURN_IF_CANCELLED(state_);
+    RETURN_IF_ERROR(merger_->GetNext(&intermediate_merge_batch, &eos));
+    RETURN_IF_ERROR(
+        merged_run->AddIntermediateBatch(&intermediate_merge_batch, 0, 
&num_copied));
+
+    DCHECK_EQ(num_copied, intermediate_merge_batch.num_rows());
+    intermediate_merge_batch.Reset();
+  }
+
+  RETURN_IF_ERROR(merged_run->FinalizeInput());
+  return Status::OK();
+}
+
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee53ddb3/be/src/runtime/sorter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h
index 16b55c2..2b76fa9 100644
--- a/be/src/runtime/sorter.h
+++ b/be/src/runtime/sorter.h
@@ -124,20 +124,28 @@ class Sorter {
   class TupleIterator;
   class TupleSorter;
 
-  /// Create a SortedRunMerger from the first 'num_runs' sorted runs in 
sorted_runs_ and
-  /// assign it to merger_. The runs to be merged are removed from 
sorted_runs_.
-  /// The Sorter sets the deep_copy_input flag to true for the merger, since 
the blocks
-  /// containing input run data will be unpinned as input runs are read.
-  Status CreateMerger(int num_runs);
+  /// Create a SortedRunMerger from sorted runs in 'sorted_runs_' and assign 
it to
+  /// 'merger_'. Attempts to set up merger with 'max_num_runs' runs but may 
set it
+  /// up with fewer if it cannot pin the initial blocks of all of the runs. 
Fails
+  /// if it cannot merge at least two runs. The runs to be merged are removed 
from
+  /// 'sorted_runs_'.  The Sorter sets the 'deep_copy_input' flag to true for 
the
+  /// merger, since the blocks containing input run data will be deleted as 
input
+  /// runs are read.
+  Status CreateMerger(int max_num_runs);
 
   /// Repeatedly replaces multiple smaller runs in sorted_runs_ with a single 
larger
-  /// merged run until the number of remaining runs is small enough for a 
single merge.
+  /// merged run until there are few enough runs to be merged with a single 
merger.
+  /// Returns when 'merger_' is set up to merge the final runs.
   /// At least 1 (2 if var-len slots) block from each sorted run must be 
pinned for
   /// a merge. If the number of sorted runs is too large, merge sets of 
smaller runs
   /// into large runs until a final merge can be performed. An intermediate 
row batch
   /// containing deep copied rows is used for the output of each intermediate 
merge.
   Status MergeIntermediateRuns();
 
+  /// Execute a single step of the intermediate merge, pulling rows from 
'merger_'
+  /// and adding them to 'merged_run'.
+  Status ExecuteIntermediateMerge(Sorter::Run* merged_run);
+
   /// Called once there no more rows to be added to 'unsorted_run_'. Sorts
   /// 'unsorted_run_' and appends it to the list of sorted runs.
   Status SortCurrentInputRun();
@@ -195,6 +203,10 @@ class Sorter {
   /// These runs can be deleted when we are done with the current merge.
   std::deque<Run*> merging_runs_;
 
+  /// Output run for the merge. Stored in Sorter() so that it can be cleaned up
+  /// in Sorter::Close() in case of errors.
+  Run* merge_output_run_;
+
   /// Pool of owned Run objects. Maintains Runs objects across non-freeing 
Reset() calls.
   ObjectPool obj_pool_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee53ddb3/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test 
b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
index 02feb31..b31620f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
@@ -1585,15 +1585,16 @@ int, bigint, bigint, double
 ---- QUERY
 # Regression test for IMPALA-2265, IMPALA-2559. The max_block_mgr_memory is 
tuned to
 # reproduce the issue when running this query against functional_parquet.
-SET max_block_mgr_memory=36m;
+SET max_block_mgr_memory=16m;
 SELECT lag(-180, 13) over (ORDER BY t1.int_col ASC, t2.int_col ASC) AS int_col
 FROM functional_parquet.alltypes t1 CROSS JOIN functional_parquet.alltypes t2 
LIMIT 10;
 ---- CATCH
 Memory limit exceeded
 ====
 ---- QUERY
-# Check that the above query can succeed with more buffers.
-SET max_block_mgr_memory=64m;
+# Check that the above query can succeed with the minimum buffers (3 buffers 
for sort,
+# 1 buffer for analytic).
+SET max_block_mgr_memory=32m;
 SELECT lag(-180, 13) over (ORDER BY t1.int_col ASC, t2.int_col ASC) AS int_col
 FROM functional_parquet.alltypes t1 CROSS JOIN functional_parquet.alltypes t2 
LIMIT 10;
 ---- TYPES

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee53ddb3/testdata/workloads/functional-query/queries/QueryTest/spilling.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/spilling.test 
b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
index f462435..e97afc5 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
@@ -589,3 +589,42 @@ STRING
 row_regex: .*SpilledRuns: .* \([1-9][0-9]*\)
 row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\)
 ====
+---- QUERY
+# IMPALA-1346/IMPALA-1546: fix sorter memory management so that it can complete
+# successfully when in same pipeline as a spilling join.
+set num_nodes=0;
+set mem_limit=200m;
+set max_block_mgr_memory=50m;
+set disable_outermost_topn=1;
+select * from lineitem
+  inner join orders on l_orderkey = o_orderkey
+order by l_linenumber, l_suppkey, l_partkey, l_orderkey
+limit 20
+---- RESULTS
+4567296,2500,1,1,48.00,67320.00,0.06,0.05,'N','O','1997-05-15','1997-05-19','1997-05-27','DELIVER
 IN PERSON','REG AIR','ccounts cajole quickly 
',4567296,100420,'O',113399.69,'1997-02-21','1-URGENT','Clerk#000000779',0,'ously
 ironic instructions. pa'
+5427587,2500,1,1,40.00,56100.00,0.05,0.07,'N','O','1997-04-01','1997-05-22','1997-04-29','TAKE
 BACK RETURN','RAIL','oxes wake even theodolites: bold 
requests',5427587,110356,'O',182983.43,'1997-03-10','1-URGENT','Clerk#000000279',0,'osits
 wake along the ca'
+3834597,7500,1,1,3.00,4222.50,0.03,0.01,'N','O','1998-07-03','1998-05-12','1998-07-04','COLLECT
 COD','AIR','sly final instructions boost 
about',3834597,29839,'O',147632.36,'1998-03-23','1-URGENT','Clerk#000000284',0,'le
 carefully blithel'
+4888512,10000,1,1,14.00,12740.00,0.03,0.01,'A','F','1993-02-07','1992-12-29','1993-02-19','NONE','SHIP','al
 braids. unusual, silent sentiments 
c',4888512,93742,'F',12481.37,'1992-11-21','4-NOT 
SPECIFIED','Clerk#000000591',0,' requests hinder blithely. closely ironic 
theodolites cajole among the car'
+693345,12497,1,1,14.00,19732.86,0.03,0.07,'N','O','1998-10-12','1998-08-18','1998-10-23','NONE','FOB','.
 blithely',693345,49472,'O',111825.01,'1998-07-06','4-NOT 
SPECIFIED','Clerk#000000132',0,'es wake regularly furiously pending orbits. 
quickly even requests according t'
+710784,12497,1,1,11.00,15504.39,0.02,0.07,'R','F','1992-08-21','1992-08-11','1992-09-05','TAKE
 BACK RETURN','SHIP','haggle furiously special accounts? final 
th',710784,82009,'F',91959.19,'1992-06-20','1-URGENT','Clerk#000000001',0,'rve 
quickly after the express theodolites. furiou'
+1240672,12497,1,1,41.00,57789.09,0.08,0.05,'N','O','1998-09-18','1998-08-04','1998-09-24','NONE','RAIL','o
 the furiously unusual requests sleep 
alo',1240672,59153,'O',55824.25,'1998-05-28','1-URGENT','Clerk#000000657',0,'ular
 pinto beans are above the furiously regular accounts: furiously even foxe'
+2024230,12497,1,1,16.00,22551.84,0.05,0.00,'A','F','1992-04-10','1992-04-14','1992-04-13','NONE','RAIL','pecial
 theodolites wake slyly. 
care',2024230,145955,'F',186715.47,'1992-02-22','5-LOW','Clerk#000000904',0,'uses.
 accounts are furiously. fluffily regular ideas haggle b'
+3039715,12497,1,1,14.00,19732.86,0.01,0.02,'R','F','1993-03-24','1993-02-12','1993-03-28','TAKE
 BACK RETURN','AIR','onic requests. furiously 
spe',3039715,27274,'F',80817.94,'1992-11-23','2-HIGH','Clerk#000000766',0,'lently
 regular packages believe slyly around the regular, regula'
+3105635,12497,1,1,16.00,22551.84,0.03,0.07,'R','F','1993-07-30','1993-08-28','1993-08-19','TAKE
 BACK RETURN','SHIP','nal, ironic theodolites solve 
carefully',3105635,23665,'F',256886.23,'1993-06-22','5-LOW','Clerk#000000029',0,'otes--
 blithely special accounts cajole slyly furiously silent dugout'
+3764485,12497,1,1,22.00,31008.78,0.05,0.08,'R','F','1993-02-07','1993-03-18','1993-02-10','COLLECT
 COD','AIR',' pinto beans 
nag',3764485,19375,'F',107449.65,'1992-12-26','5-LOW','Clerk#000000364',0,' 
carefully regular foxes bo'
+4767393,12497,1,1,3.00,4228.47,0.09,0.06,'A','F','1992-09-15','1992-10-02','1992-09-17','DELIVER
 IN PERSON','FOB',' asymptotes. 
blithely',4767393,22522,'F',163054.32,'1992-08-10','2-HIGH','Clerk#000000174',0,'unusual
 foxes after the furiously regular multipliers detect ca'
+1955428,14998,1,1,47.00,89910.53,0.07,0.02,'R','F','1993-01-25','1993-04-02','1993-01-31','DELIVER
 IN PERSON','MAIL','thely regular 
frays',1955428,41801,'F',134123.84,'1993-01-08','4-NOT 
SPECIFIED','Clerk#000000422',0,'ld deposits are slyly. quickly bold ideas bo'
+4197636,14998,1,1,6.00,11477.94,0.03,0.01,'A','F','1993-11-28','1993-11-27','1993-12-03','TAKE
 BACK RETURN','SHIP','ly. slyly final 
ex',4197636,77494,'F',283248.52,'1993-10-18','1-URGENT','Clerk#000000087',0,'final
 asymptotes? packages doze against the ironic packages. ironic, bold dec'
+4899558,17499,1,1,12.00,16997.88,0.00,0.08,'A','F','1992-08-06','1992-08-09','1992-08-14','COLLECT
 COD','RAIL','ourts would sleep fluffily express 
accou',4899558,111973,'F',300951.70,'1992-05-21','1-URGENT','Clerk#000000826',0,'eodolites
 wake ironic sentiments. r'
+4725760,20000,1,1,48.00,44160.00,0.02,0.05,'R','F','1992-03-06','1992-03-04','1992-03-20','NONE','SHIP','liers.
 slyly regular request',4725760,45176,'F',202199.68,'1992-01-31','4-NOT 
SPECIFIED','Clerk#000000853',0,'riously regular accounts. ironic, bold requests 
was slyly; slyly regula'
+49444,22494,1,1,22.00,31162.78,0.06,0.08,'N','O','1997-03-03','1997-04-19','1997-03-25','TAKE
 BACK RETURN','MAIL','l requests among the blithely 
fin',49444,11725,'O',273794.80,'1997-01-23','2-HIGH','Clerk#000000814',0,'s. 
quickly bold packages integrate. furio'
+396839,24996,1,1,9.00,17288.91,0.03,0.07,'N','O','1996-06-19','1996-04-27','1996-07-13','COLLECT
 COD','MAIL',' accounts 
',396839,63389,'O',193790.34,'1996-03-11','5-LOW','Clerk#000000436',0,'eans. 
instructions are quickly--'
+1109894,24996,1,1,22.00,42261.78,0.01,0.05,'A','F','1995-01-10','1995-03-01','1995-01-21','TAKE
 BACK RETURN','RAIL','nusual requests wake. 
qu',1109894,35489,'F',78070.57,'1994-12-23','2-HIGH','Clerk#000000524',0,' bold 
deposits engage fluffily among the s'
+5825859,24996,1,1,5.00,9604.95,0.02,0.00,'A','F','1995-05-05','1995-03-17','1995-05-21','TAKE
 BACK RETURN','MAIL','y special 
a',5825859,13285,'F',90407.81,'1995-01-15','3-MEDIUM','Clerk#000000818',0,'ajole.
 quickly ironic theodolites '
+---- TYPES
+BIGINT,BIGINT,BIGINT,INT,DECIMAL,DECIMAL,DECIMAL,DECIMAL,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,BIGINT,BIGINT,STRING,DECIMAL,STRING,STRING,STRING,INT,STRING
+---- RUNTIME_PROFILE
+# Verify that the sort and join actually spilled
+row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
+row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\)
+====

Reply via email to