IMPALA-5446: dropped Sorter::Reset() status

This patch aligns the sorter's methods closer with the ExecNode methods
and moves the possibly-failing parts of Reset() into Open().

Testing:
Added WARN_UNUSED_RESULT to all the sorter methods that return Status to
prevent similar issues in future.

Add a test that sometimes goes down this code path. It was able to cause
a crash at least once every 5 executions.

Ran an exhaustive build to make sure there were no other regressions.

Change-Id: I7d4f9e93a44531901e663b3f1e18edc514363f74
Reviewed-on: http://gerrit.cloudera.org:8080/7134
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7a0ee685
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7a0ee685
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7a0ee685

Branch: refs/heads/master
Commit: 7a0ee685b846b4300c00eb2dc99c6f9c462bf66e
Parents: 58baca7
Author: Tim Armstrong <[email protected]>
Authored: Thu Jun 8 16:38:22 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue Jun 13 05:13:44 2017 +0000

----------------------------------------------------------------------
 be/src/exec/sort-node.cc                        |  3 ++-
 be/src/runtime/sorter.cc                        | 19 ++++++++-------
 be/src/runtime/sorter.h                         | 25 +++++++++++---------
 .../queries/QueryTest/nested-types-tpch.test    | 17 +++++++++++++
 4 files changed, 43 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a0ee685/be/src/exec/sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 1a8de83..9660ed3 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -54,7 +54,7 @@ Status SortNode::Prepare(RuntimeState* state) {
   sorter_.reset(
       new Sorter(*less_than_.get(), 
sort_exec_exprs_.sort_tuple_slot_expr_ctxs(),
           &row_descriptor_, mem_tracker(), runtime_profile(), state));
-  RETURN_IF_ERROR(sorter_->Init());
+  RETURN_IF_ERROR(sorter_->Prepare());
   AddCodegenDisabledMessage(state);
   return Status::OK();
 }
@@ -75,6 +75,7 @@ Status SortNode::Open(RuntimeState* state) {
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
   RETURN_IF_ERROR(child(0)->Open(state));
+  RETURN_IF_ERROR(sorter_->Open());
 
   // The child has been opened and the sorter created. Sort the input.
   // The final merge is done on-demand as rows are requested in GetNext().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a0ee685/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index 1acdf15..0d41ec4 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -1362,13 +1362,12 @@ Sorter::~Sorter() {
   DCHECK(merge_output_run_ == NULL);
 }
 
-Status Sorter::Init() {
-  DCHECK(unsorted_run_ == NULL) << "Already initialized";
+Status Sorter::Prepare() {
+  DCHECK(in_mem_tuple_sorter_ == NULL) << "Already prepared";
   TupleDescriptor* sort_tuple_desc = output_row_desc_->tuple_descriptors()[0];
   has_var_len_slots_ = sort_tuple_desc->HasVarlenSlots();
   in_mem_tuple_sorter_.reset(new TupleSorter(compare_less_than_,
       block_mgr_->max_block_size(), sort_tuple_desc->byte_size(), state_));
-  unsorted_run_ = obj_pool_.Add(new Run(this, sort_tuple_desc, true));
 
   initial_runs_counter_ = ADD_COUNTER(profile_, "InitialRunsCreated", 
TUnit::UNIT);
   spilled_runs_counter_ = ADD_COUNTER(profile_, "SpilledRuns", TUnit::UNIT);
@@ -1384,8 +1383,14 @@ Status Sorter::Init() {
 
   RETURN_IF_ERROR(block_mgr_->RegisterClient(Substitute("Sorter ptr=$0", this),
       min_buffers_required, false, mem_tracker_, state_, &block_mgr_client_));
+  return Status::OK();
+}
 
-  DCHECK(unsorted_run_ != NULL);
+Status Sorter::Open() {
+  DCHECK(in_mem_tuple_sorter_ != NULL) << "Not prepared";
+  DCHECK(unsorted_run_ == NULL) << "Already open";
+  TupleDescriptor* sort_tuple_desc = output_row_desc_->tuple_descriptors()[0];
+  unsorted_run_ = obj_pool_.Add(new Run(this, sort_tuple_desc, true));
   RETURN_IF_ERROR(unsorted_run_->Init());
   return Status::OK();
 }
@@ -1446,16 +1451,12 @@ Status Sorter::GetNext(RowBatch* output_batch, bool* 
eos) {
   }
 }
 
-Status Sorter::Reset() {
+void Sorter::Reset() {
   DCHECK(unsorted_run_ == NULL) << "Cannot Reset() before calling InputDone()";
   merger_.reset();
   // Free resources from the current runs.
   CleanupAllRuns();
   obj_pool_.Clear();
-  unsorted_run_ = obj_pool_.Add(
-      new Run(this, output_row_desc_->tuple_descriptors()[0], true));
-  RETURN_IF_ERROR(unsorted_run_->Init());
-  return Status::OK();
 }
 
 void Sorter::Close() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a0ee685/be/src/runtime/sorter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h
index 54e2d22..d91afa8 100644
--- a/be/src/runtime/sorter.h
+++ b/be/src/runtime/sorter.h
@@ -100,24 +100,27 @@ class Sorter {
 
   ~Sorter();
 
-  /// Initialization code, including registration to the block_mgr and the 
initialization
-  /// of the unsorted_run_, both of these may fail.
-  Status Init();
+  /// Initial set-up of the sorter for execution. Registers with the block mgr.
+  Status Prepare() WARN_UNUSED_RESULT;
+
+  /// Open the sorter for adding rows. Must be called after Prepare() or 
Reset() and
+  /// before calling AddBatch().
+  Status Open() WARN_UNUSED_RESULT;
 
   /// Adds a batch of input rows to the current unsorted run.
-  Status AddBatch(RowBatch* batch);
+  Status AddBatch(RowBatch* batch) WARN_UNUSED_RESULT;
 
   /// Called to indicate there is no more input. Triggers the creation of 
merger(s) if
   /// necessary.
-  Status InputDone();
+  Status InputDone() WARN_UNUSED_RESULT;
 
   /// Get the next batch of sorted output rows from the sorter.
-  Status GetNext(RowBatch* batch, bool* eos);
+  Status GetNext(RowBatch* batch, bool* eos) WARN_UNUSED_RESULT;
 
   /// Resets all internal state like ExecNode::Reset().
   /// Init() must have been called, AddBatch()/GetNext()/InputDone()
   /// may or may not have been called.
-  Status Reset();
+  void Reset();
 
   /// Close the Sorter and free resources.
   void Close();
@@ -134,7 +137,7 @@ class Sorter {
   /// 'sorted_runs_'.  The Sorter sets the 'deep_copy_input' flag to true for 
the
   /// merger, since the blocks containing input run data will be deleted as 
input
   /// runs are read.
-  Status CreateMerger(int max_num_runs);
+  Status CreateMerger(int max_num_runs) WARN_UNUSED_RESULT;
 
   /// Repeatedly replaces multiple smaller runs in sorted_runs_ with a single 
larger
   /// merged run until there are few enough runs to be merged with a single 
merger.
@@ -143,15 +146,15 @@ class Sorter {
   /// a merge. If the number of sorted runs is too large, merge sets of 
smaller runs
   /// into large runs until a final merge can be performed. An intermediate 
row batch
   /// containing deep copied rows is used for the output of each intermediate 
merge.
-  Status MergeIntermediateRuns();
+  Status MergeIntermediateRuns() WARN_UNUSED_RESULT;
 
   /// Execute a single step of the intermediate merge, pulling rows from 
'merger_'
   /// and adding them to 'merged_run'.
-  Status ExecuteIntermediateMerge(Sorter::Run* merged_run);
+  Status ExecuteIntermediateMerge(Sorter::Run* merged_run) WARN_UNUSED_RESULT;
 
   /// Called once there no more rows to be added to 'unsorted_run_'. Sorts
   /// 'unsorted_run_' and appends it to the list of sorted runs.
-  Status SortCurrentInputRun();
+  Status SortCurrentInputRun() WARN_UNUSED_RESULT;
 
   /// Helper that cleans up all runs in the sorter.
   void CleanupAllRuns();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a0ee685/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch.test 
b/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch.test
index 5ace4f4..626b315 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch.test
@@ -233,3 +233,20 @@ order by c_custkey
 ---- TYPES
 bigint, bigint
 ====
+---- QUERY
+# IMPALA-5446: dropped status from Sorter::Reset() when sort cannot get 
reserved buffer.
+# This query is designed to allow the initial subplan iterations to succeed, 
but have
+# later iterations fail because the aggregation outside the subplan has 
accumulated all
+# the memory.
+set max_block_mgr_memory=100m;
+select c_custkey, c_name, c_address, c_phone, c_acctbal, c_mktsegment, 
c_comment,
+       o_orderdate, sum(o_totalprice), min(rnum)
+from customer c,
+  (select o_orderkey, o_totalprice, o_orderdate, row_number() over (order by 
o_orderdate desc) rnum
+   from c.c_orders) v
+group by 1, 2, 3, 4, 5, 6, 7, 8
+order by 9, 10 desc
+limit 10
+---- CATCH
+Memory limit exceeded: Query did not have enough memory to get the minimum 
required buffers in the block manager.
+====

Reply via email to