IMPALA-5446: dropped Sorter::Reset() status This patch aligns the sorter's methods closer with the ExecNode methods and moves the possibly-failing parts of Reset() into Open().
Testing: Added WARN_UNUSED_RESULT to all the sorter methods that return Status to prevent similar issues in future. Add a test that sometimes goes down this code path. It was able to cause a crash at least once every 5 executions. Ran an exhaustive build to make sure there were no other regressions. Change-Id: I7d4f9e93a44531901e663b3f1e18edc514363f74 Reviewed-on: http://gerrit.cloudera.org:8080/7134 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7a0ee685 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7a0ee685 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7a0ee685 Branch: refs/heads/master Commit: 7a0ee685b846b4300c00eb2dc99c6f9c462bf66e Parents: 58baca7 Author: Tim Armstrong <[email protected]> Authored: Thu Jun 8 16:38:22 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Jun 13 05:13:44 2017 +0000 ---------------------------------------------------------------------- be/src/exec/sort-node.cc | 3 ++- be/src/runtime/sorter.cc | 19 ++++++++------- be/src/runtime/sorter.h | 25 +++++++++++--------- .../queries/QueryTest/nested-types-tpch.test | 17 +++++++++++++ 4 files changed, 43 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a0ee685/be/src/exec/sort-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc index 1a8de83..9660ed3 100644 --- a/be/src/exec/sort-node.cc +++ b/be/src/exec/sort-node.cc @@ -54,7 +54,7 @@ Status SortNode::Prepare(RuntimeState* state) { sorter_.reset( new Sorter(*less_than_.get(), sort_exec_exprs_.sort_tuple_slot_expr_ctxs(), &row_descriptor_, mem_tracker(), runtime_profile(), state)); - RETURN_IF_ERROR(sorter_->Init()); + RETURN_IF_ERROR(sorter_->Prepare()); AddCodegenDisabledMessage(state); return Status::OK(); } @@ -75,6 +75,7 @@ Status SortNode::Open(RuntimeState* state) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); RETURN_IF_ERROR(child(0)->Open(state)); + RETURN_IF_ERROR(sorter_->Open()); // The child has been opened and the sorter created. Sort the input. // The final merge is done on-demand as rows are requested in GetNext(). http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a0ee685/be/src/runtime/sorter.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc index 1acdf15..0d41ec4 100644 --- a/be/src/runtime/sorter.cc +++ b/be/src/runtime/sorter.cc @@ -1362,13 +1362,12 @@ Sorter::~Sorter() { DCHECK(merge_output_run_ == NULL); } -Status Sorter::Init() { - DCHECK(unsorted_run_ == NULL) << "Already initialized"; +Status Sorter::Prepare() { + DCHECK(in_mem_tuple_sorter_ == NULL) << "Already prepared"; TupleDescriptor* sort_tuple_desc = output_row_desc_->tuple_descriptors()[0]; has_var_len_slots_ = sort_tuple_desc->HasVarlenSlots(); in_mem_tuple_sorter_.reset(new TupleSorter(compare_less_than_, block_mgr_->max_block_size(), sort_tuple_desc->byte_size(), state_)); - unsorted_run_ = obj_pool_.Add(new Run(this, sort_tuple_desc, true)); initial_runs_counter_ = ADD_COUNTER(profile_, "InitialRunsCreated", TUnit::UNIT); spilled_runs_counter_ = ADD_COUNTER(profile_, "SpilledRuns", TUnit::UNIT); @@ -1384,8 +1383,14 @@ Status Sorter::Init() { RETURN_IF_ERROR(block_mgr_->RegisterClient(Substitute("Sorter ptr=$0", this), min_buffers_required, false, mem_tracker_, state_, &block_mgr_client_)); + return Status::OK(); +} - DCHECK(unsorted_run_ != NULL); +Status Sorter::Open() { + DCHECK(in_mem_tuple_sorter_ != NULL) << "Not prepared"; + DCHECK(unsorted_run_ == NULL) << "Already open"; + TupleDescriptor* sort_tuple_desc = output_row_desc_->tuple_descriptors()[0]; + unsorted_run_ = obj_pool_.Add(new Run(this, sort_tuple_desc, true)); RETURN_IF_ERROR(unsorted_run_->Init()); return Status::OK(); } @@ -1446,16 +1451,12 @@ Status Sorter::GetNext(RowBatch* output_batch, bool* eos) { } } -Status Sorter::Reset() { +void Sorter::Reset() { DCHECK(unsorted_run_ == NULL) << "Cannot Reset() before calling InputDone()"; merger_.reset(); // Free resources from the current runs. CleanupAllRuns(); obj_pool_.Clear(); - unsorted_run_ = obj_pool_.Add( - new Run(this, output_row_desc_->tuple_descriptors()[0], true)); - RETURN_IF_ERROR(unsorted_run_->Init()); - return Status::OK(); } void Sorter::Close() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a0ee685/be/src/runtime/sorter.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h index 54e2d22..d91afa8 100644 --- a/be/src/runtime/sorter.h +++ b/be/src/runtime/sorter.h @@ -100,24 +100,27 @@ class Sorter { ~Sorter(); - /// Initialization code, including registration to the block_mgr and the initialization - /// of the unsorted_run_, both of these may fail. - Status Init(); + /// Initial set-up of the sorter for execution. Registers with the block mgr. + Status Prepare() WARN_UNUSED_RESULT; + + /// Open the sorter for adding rows. Must be called after Prepare() or Reset() and + /// before calling AddBatch(). + Status Open() WARN_UNUSED_RESULT; /// Adds a batch of input rows to the current unsorted run. - Status AddBatch(RowBatch* batch); + Status AddBatch(RowBatch* batch) WARN_UNUSED_RESULT; /// Called to indicate there is no more input. Triggers the creation of merger(s) if /// necessary. - Status InputDone(); + Status InputDone() WARN_UNUSED_RESULT; /// Get the next batch of sorted output rows from the sorter. - Status GetNext(RowBatch* batch, bool* eos); + Status GetNext(RowBatch* batch, bool* eos) WARN_UNUSED_RESULT; /// Resets all internal state like ExecNode::Reset(). /// Init() must have been called, AddBatch()/GetNext()/InputDone() /// may or may not have been called. - Status Reset(); + void Reset(); /// Close the Sorter and free resources. void Close(); @@ -134,7 +137,7 @@ class Sorter { /// 'sorted_runs_'. The Sorter sets the 'deep_copy_input' flag to true for the /// merger, since the blocks containing input run data will be deleted as input /// runs are read. - Status CreateMerger(int max_num_runs); + Status CreateMerger(int max_num_runs) WARN_UNUSED_RESULT; /// Repeatedly replaces multiple smaller runs in sorted_runs_ with a single larger /// merged run until there are few enough runs to be merged with a single merger. @@ -143,15 +146,15 @@ class Sorter { /// a merge. If the number of sorted runs is too large, merge sets of smaller runs /// into large runs until a final merge can be performed. An intermediate row batch /// containing deep copied rows is used for the output of each intermediate merge. - Status MergeIntermediateRuns(); + Status MergeIntermediateRuns() WARN_UNUSED_RESULT; /// Execute a single step of the intermediate merge, pulling rows from 'merger_' /// and adding them to 'merged_run'. - Status ExecuteIntermediateMerge(Sorter::Run* merged_run); + Status ExecuteIntermediateMerge(Sorter::Run* merged_run) WARN_UNUSED_RESULT; /// Called once there no more rows to be added to 'unsorted_run_'. Sorts /// 'unsorted_run_' and appends it to the list of sorted runs. - Status SortCurrentInputRun(); + Status SortCurrentInputRun() WARN_UNUSED_RESULT; /// Helper that cleans up all runs in the sorter. void CleanupAllRuns(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a0ee685/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch.test b/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch.test index 5ace4f4..626b315 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch.test +++ b/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch.test @@ -233,3 +233,20 @@ order by c_custkey ---- TYPES bigint, bigint ==== +---- QUERY +# IMPALA-5446: dropped status from Sorter::Reset() when sort cannot get reserved buffer. +# This query is designed to allow the initial subplan iterations to succeed, but have +# later iterations fail because the aggregation outside the subplan has accumulated all +# the memory. +set max_block_mgr_memory=100m; +select c_custkey, c_name, c_address, c_phone, c_acctbal, c_mktsegment, c_comment, + o_orderdate, sum(o_totalprice), min(rnum) +from customer c, + (select o_orderkey, o_totalprice, o_orderdate, row_number() over (order by o_orderdate desc) rnum + from c.c_orders) v +group by 1, 2, 3, 4, 5, 6, 7, 8 +order by 9, 10 desc +limit 10 +---- CATCH +Memory limit exceeded: Query did not have enough memory to get the minimum required buffers in the block manager. +====
