Repository: incubator-impala
Updated Branches:
  refs/heads/master a1b035a25 -> d23e5505c


IMPALA-3670: fix sorter buffer mgmt bugs

Also make test_scratch_disk.py more deterministic, by using
max_block_mgr_memory, which doesn't include scanner memory.
The fixed test_scratch_disk.py exercises the other sorter bugs
that occurs when scratch cannot be written.

Testing:
Added a test that does a sort with various memory limits and consumes
the whole output of the sorter (we have many tests of sorts with limits
but limited coverage of sorts without limits).  Ran an exhaustive test
run before posting for review.

This added test reproduced one of the sorter bugs, where var-len blocks
were not always attached to the output batch. The other test was
reproduced by the test change in IMPALA-3669: test_scratch_disk fix.

Change-Id: Ia1a0ddffa0a5b157ab86a376b7b7360a923698d6
Reviewed-on: http://gerrit.cloudera.org:8080/3315
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Tim Armstrong <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d23e5505
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d23e5505
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d23e5505

Branch: refs/heads/master
Commit: d23e5505c81a793e931b7070b9bebfddee35f430
Parents: a1b035a
Author: Tim Armstrong <[email protected]>
Authored: Fri Jun 3 14:19:12 2016 -0700
Committer: Tim Armstrong <[email protected]>
Committed: Mon Jun 6 22:34:19 2016 -0700

----------------------------------------------------------------------
 be/src/runtime/sorter.cc      | 47 +++++++++++++++++++-------------------
 tests/query_test/test_sort.py | 23 +++++++++++++++++--
 2 files changed, 45 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d23e5505/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index 2e9e6ec..627f0e1 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -670,6 +670,7 @@ Status Sorter::Run::UnpinAllBlocks() {
   // A list of var len blocks to replace 'var_len_blocks_'. Note that after we 
are done
   // we may have a different number of blocks, because internal fragmentation 
may leave
   // slightly different amounts of wasted space at the end of each block.
+  // We need to be careful to clean up these blocks if we run into an error in 
this method.
   vector<BufferedBlockMgr::Block*> sorted_var_len_blocks;
   sorted_var_len_blocks.reserve(var_len_blocks_.size());
 
@@ -692,6 +693,7 @@ Status Sorter::Run::UnpinAllBlocks() {
     DCHECK(var_len_copy_block_ == NULL);
   }
 
+  Status status;
   for (int i = 0; i < fixed_len_blocks_.size(); ++i) {
     BufferedBlockMgr::Block* cur_fixed_block = fixed_len_blocks_[i];
     // Skip converting the pointers if no var-len slots, or if all the values 
are null
@@ -706,7 +708,8 @@ Status Sorter::Run::UnpinAllBlocks() {
         DCHECK(cur_sorted_var_len_block != NULL);
         if (cur_sorted_var_len_block->BytesRemaining() < total_var_len) {
           bool added;
-          RETURN_IF_ERROR(TryAddBlock(UNPIN_PREV, &sorted_var_len_blocks, 
&added));
+          status = TryAddBlock(UNPIN_PREV, &sorted_var_len_blocks, &added);
+          if (!status.ok()) goto cleanup_blocks;
           DCHECK(added) << "TryAddBlock() with UNPIN_PREV should not fail to 
add";
           cur_sorted_var_len_block = sorted_var_len_blocks.back();
         }
@@ -722,7 +725,8 @@ Status Sorter::Run::UnpinAllBlocks() {
 
   if (HasVarLenBlocks()) {
     DCHECK_GT(sorted_var_len_blocks.back()->valid_data_len(), 0);
-    RETURN_IF_ERROR(sorted_var_len_blocks.back()->Unpin());
+    status = sorted_var_len_blocks.back()->Unpin();
+    if (!status.ok()) goto cleanup_blocks;
   }
 
   // Clear var_len_blocks_ and replace with it with the contents of 
sorted_var_len_blocks
@@ -731,6 +735,10 @@ Status Sorter::Run::UnpinAllBlocks() {
   is_pinned_ = false;
   sorter_->spilled_runs_counter_->Add(1);
   return Status::OK();
+
+cleanup_blocks:
+  DeleteAndClearBlocks(&sorted_var_len_blocks);
+  return status;
 }
 
 Status Sorter::Run::PrepareRead(bool* pinned_all_blocks) {
@@ -802,10 +810,9 @@ Status Sorter::Run::GetNextBatch(RowBatch** output_batch) {
 template <bool CONVERT_OFFSET_TO_PTR>
 Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
   // Var-len offsets are converted only when reading var-len data from 
unpinned runs.
-  if (HasVarLenBlocks()) DCHECK_EQ(!is_pinned_, CONVERT_OFFSET_TO_PTR);
   // We shouldn't convert var len offsets if there are no blocks, since in 
that case
   // they must all be null or zero-length strings, which don't point into a 
valid block.
-  if (!HasVarLenBlocks()) DCHECK(!CONVERT_OFFSET_TO_PTR);
+  DCHECK_EQ(CONVERT_OFFSET_TO_PTR, HasVarLenBlocks() && !is_pinned_);
 
   if (end_of_fixed_len_block_ &&
       fixed_len_blocks_index_ >= static_cast<int>(fixed_len_blocks_.size()) - 
1) {
@@ -851,22 +858,14 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* 
eos) {
         fixed_len_block->buffer() + fixed_len_block_offset_);
 
     if (CONVERT_OFFSET_TO_PTR && !ConvertOffsetsToPtrs(input_tuple)) {
+      DCHECK(!is_pinned_);
       // The var-len data is in the next block. We are done with the current 
block, so
       // return rows we've accumulated so far and advance to the next block in 
the next
-      // GetNext() call. This is needed for the unpinned case where we can't 
pin the next
-      // block before we delete the current block.
-      if (is_pinned_) {
-        // Attach block to batch. We don't need the block any more and we 
don't need to
-        // reclaim the block's memory, since we already either have the sorted 
data all in
-        // memory. The caller can delete the block when it wants to.
-        output_batch->AddBlock(var_len_blocks_[var_len_blocks_index_]);
-        var_len_blocks_[var_len_blocks_index_] = NULL;
-      } else {
-        // To iterate over unpinned runs, we need to exchange this block for 
the next
-        // in the next GetNext() call, so we need to hold onto the block and 
signal to
-        // the caller that the block is going to be deleted.
-        output_batch->MarkNeedToReturn();
-      }
+      // GetNext() call. This is needed for the unpinned case where we need to 
exchange
+      // this block for the next in the next GetNext() call. So therefore we 
must hold
+      // onto the current var-len block and signal to the caller that the 
block is going
+      // to be deleted.
+      output_batch->MarkNeedToReturn();
       end_of_var_len_block_ = true;
       break;
     }
@@ -883,11 +882,13 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* 
eos) {
       output_batch->AddBlock(fixed_len_blocks_[fixed_len_blocks_index_]);
       fixed_len_blocks_[fixed_len_blocks_index_] = NULL;
 
-      // Also attach the last var-len block at eos, since it won't be attached 
elsewhere.
-      if (HasVarLenBlocks() &&
-          fixed_len_blocks_index_ == fixed_len_blocks_.size() - 1) {
-        output_batch->AddBlock(var_len_blocks_[var_len_blocks_index_]);
-        var_len_blocks_[var_len_blocks_index_] = NULL;
+      // Attach the var-len blocks at eos once no more rows will reference the 
blocks.
+      if (fixed_len_blocks_index_ == fixed_len_blocks_.size() - 1) {
+        for (BufferedBlockMgr::Block* var_len_block: var_len_blocks_) {
+          DCHECK(var_len_block != NULL);
+          output_batch->AddBlock(var_len_block);
+        }
+        var_len_blocks_.clear();
       }
     } else {
       // To iterate over unpinned runs, we need to exchange this block for the 
next

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d23e5505/tests/query_test/test_sort.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_sort.py b/tests/query_test/test_sort.py
index 281957a..a83417e 100644
--- a/tests/query_test/test_sort.py
+++ b/tests/query_test/test_sort.py
@@ -61,6 +61,25 @@ class TestQueryFullSort(ImpalaTestSuite):
         query, exec_option, table_format=table_format).data)
       assert(result[0] == sorted(result[0]))
 
+  def test_multiple_mem_limits_full_output(self, vector):
+    """ Exercise a range of memory limits, returning the full sorted input. """
+    query = """select o_orderdate, o_custkey, o_comment
+      from orders
+      order by o_orderdate"""
+    exec_option = vector.get_value('exec_option')
+    table_format = vector.get_value('table_format')
+
+    max_block_mgr_memory_values = ['-1', '48M'] # Unlimited and minimum memory.
+    if self.exploration_strategy() == 'exhaustive' and \
+        table_format.file_format == 'parquet':
+      # Test some intermediate values for parquet on exhaustive.
+      max_block_mgr_memory_values += ['64M', '128M', '256M']
+    for max_block_mgr_memory in max_block_mgr_memory_values:
+      exec_option['max_block_mgr_memory'] = max_block_mgr_memory
+      result = transpose_results(self.execute_query(
+        query, exec_option, table_format=table_format).data)
+      assert(result[0] == sorted(result[0]))
+
   def test_sort_join(self, vector):
     """With 200m memory limit this should be a 2-phase sort"""
 
@@ -99,9 +118,9 @@ class TestQueryFullSort(ImpalaTestSuite):
     in the right partition.
     """
     query = """select l_orderkey from (
-      select * from tpch.lineitem limit 300000
+      select * from lineitem limit 300000
       union all
-      select * from tpch.lineitem limit 300000) t
+      select * from lineitem limit 300000) t
     order by l_orderkey"""
 
     exec_option = vector.get_value('exec_option')

Reply via email to