Repository: impala Updated Branches: refs/heads/master 4ab1b9245 -> ea615d1d8
Analytic mem mgmt cleanup Following on from IMPALA-7403, clean up some of memory management in AnalyticEvalNode. Remove an unnecessary MemPool and avoid reallocating tuples unnecessarily in Reset(). Also switch to inline initialization of members. Testing: Ran exhaustive tests Change-Id: Id16beb2d0b9d4315f52dd45203649252c0eb06b0 Reviewed-on: http://gerrit.cloudera.org:8080/11203 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/884fcf81 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/884fcf81 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/884fcf81 Branch: refs/heads/master Commit: 884fcf81b8f0df949ee615c6b99694c01500da08 Parents: 4ab1b92 Author: Tim Armstrong <tarmstr...@cloudera.com> Authored: Tue Aug 7 14:17:10 2018 -0700 Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Committed: Tue Aug 14 01:37:44 2018 +0000 ---------------------------------------------------------------------- be/src/exec/analytic-eval-node.cc | 78 ++++++++++++++-------------------- be/src/exec/analytic-eval-node.h | 41 +++++++++--------- 2 files changed, 54 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/884fcf81/be/src/exec/analytic-eval-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc index 68170a2..036cf93 100644 --- a/be/src/exec/analytic-eval-node.cc +++ b/be/src/exec/analytic-eval-node.cc @@ -48,20 +48,7 @@ AnalyticEvalNode::AnalyticEvalNode( intermediate_tuple_desc_( descs.GetTupleDescriptor(tnode.analytic_node.intermediate_tuple_id)), result_tuple_desc_( - descs.GetTupleDescriptor(tnode.analytic_node.output_tuple_id)), - rows_start_offset_(0), - rows_end_offset_(0), - has_first_val_null_offset_(false), - first_val_null_offset_(0), - last_result_idx_(-1), - prev_pool_last_result_idx_(-1), - prev_pool_last_window_idx_(-1), - curr_tuple_(nullptr), - dummy_result_tuple_(nullptr), - curr_partition_idx_(-1), - input_stream_(nullptr), - input_eos_(false), - evaluation_timer_(nullptr) { + descs.GetTupleDescriptor(tnode.analytic_node.output_tuple_id)) { if (tnode.analytic_node.__isset.buffered_tuple_id) { buffered_tuple_desc_ = descs.GetTupleDescriptor( tnode.analytic_node.buffered_tuple_id); @@ -159,7 +146,6 @@ Status AnalyticEvalNode::Prepare(RuntimeState* state) { resource_profile_.spillable_buffer_size * MIN_REQUIRED_BUFFERS); curr_tuple_pool_.reset(new MemPool(mem_tracker())); prev_tuple_pool_.reset(new MemPool(mem_tracker())); - mem_pool_.reset(new MemPool(mem_tracker())); prev_input_tuple_pool_.reset(new MemPool(mem_tracker())); evaluation_timer_ = ADD_TIMER(runtime_profile(), "EvaluationTime"); @@ -176,6 +162,13 @@ Status AnalyticEvalNode::Prepare(RuntimeState* state) { RETURN_IF_ERROR(ScalarExprEvaluator::Create(*order_by_eq_expr_, state, pool_, expr_perm_pool(), expr_results_pool(), &order_by_eq_expr_eval_)); } + + // An intermediate tuple that is only allocated once and is reused. 'curr_tuple_' is + // initialized in Open() before it is used. + curr_tuple_ = + Tuple::Create(intermediate_tuple_desc_->byte_size(), expr_perm_pool_.get()); + dummy_result_tuple_ = + Tuple::Create(result_tuple_desc_->byte_size(), expr_perm_pool_.get()); return Status::OK(); } @@ -222,20 +215,20 @@ Status AnalyticEvalNode::Open(RuntimeState* state) { RETURN_IF_ERROR(order_by_eq_expr_eval_->Open(state)); } - // An intermediate tuple is only allocated once and is reused. - curr_tuple_ = Tuple::Create(intermediate_tuple_desc_->byte_size(), mem_pool_.get()); + // Initialize the tuple that was allocated in Prepare(). + // TODO: zeroing out curr_tuple_ shouldn't be strictly necessary. + curr_tuple_->Init(intermediate_tuple_desc_->byte_size()); AggFnEvaluator::Init(analytic_fn_evals_, curr_tuple_); - // Allocate dummy_result_tuple_ even if AggFnEvaluator::Init() may have failed - // as it is needed in Close(). - // TODO: move this to Prepare() - dummy_result_tuple_ = Tuple::Create(result_tuple_desc_->byte_size(), mem_pool_.get()); + curr_tuple_init_ = true; // Check for failures during AggFnEvaluator::Init(). RETURN_IF_ERROR(state->GetQueryStatus()); // Initialize state for the first partition. RETURN_IF_ERROR(InitNextPartition(state, 0)); - curr_child_batch_.reset(new RowBatch(child(0)->row_desc(), state->batch_size(), - mem_tracker())); + if (curr_child_batch_ == nullptr) { + curr_child_batch_.reset( + new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker())); + } return Status::OK(); } @@ -364,11 +357,11 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) { Status AnalyticEvalNode::AddResultTuple(int64_t stream_idx) { VLOG_ROW << id() << " AddResultTuple idx=" << stream_idx; DCHECK(curr_tuple_ != nullptr); - MemPool* cur_tuple_pool = curr_tuple_pool_.get(); - Tuple* result_tuple = Tuple::Create(result_tuple_desc_->byte_size(), cur_tuple_pool); + MemPool* curr_tuple_pool = curr_tuple_pool_.get(); + Tuple* result_tuple = Tuple::Create(result_tuple_desc_->byte_size(), curr_tuple_pool); AggFnEvaluator::GetValue(analytic_fn_evals_, curr_tuple_, result_tuple); - // Copy any string data in 'result_tuple' into 'cur_tuple_pool'. The var-len data + // Copy any string data in 'result_tuple' into 'curr_tuple_pool'. The var-len data // returned by GetValue() may be backed by an allocation from // 'expr_results_pool_' that will be recycled so it must be copied out. for (const SlotDescriptor* slot_desc : result_tuple_desc_->string_slots()) { @@ -376,9 +369,9 @@ Status AnalyticEvalNode::AddResultTuple(int64_t stream_idx) { StringValue* sv = result_tuple->GetStringSlot(slot_desc->tuple_offset()); if (sv->len == 0) continue; char* new_ptr = reinterpret_cast<char*>( - cur_tuple_pool->TryAllocateUnaligned(sv->len)); + curr_tuple_pool->TryAllocateUnaligned(sv->len)); if (UNLIKELY(new_ptr == nullptr)) { - return cur_tuple_pool->mem_tracker()->MemLimitExceeded(nullptr, + return curr_tuple_pool->mem_tracker()->MemLimitExceeded(nullptr, "Failed to allocate memory for analytic function's result.", sv->len); } memcpy(new_ptr, sv->ptr, sv->len); @@ -539,8 +532,10 @@ inline Status AnalyticEvalNode::InitNextPartition(RuntimeState* state, VLOG_ROW << id() << " Reset curr_tuple"; // Call finalize to release resources; result is not needed but the dst tuple must be // a tuple described by result_tuple_desc_. + DCHECK(curr_tuple_init_); AggFnEvaluator::Finalize(analytic_fn_evals_, curr_tuple_, dummy_result_tuple_); // Re-initialize curr_tuple_. + // TODO: zeroing out curr_tuple_ shouldn't be strictly necessary. curr_tuple_->Init(intermediate_tuple_desc_->byte_size()); AggFnEvaluator::Init(analytic_fn_evals_, curr_tuple_); // Check for errors in AggFnEvaluator::Init(). @@ -608,7 +603,6 @@ Status AnalyticEvalNode::ProcessChildBatches(RuntimeState* state) { // than 2x the batch size unless the end bound has an offset preceding, in which // case it may be slightly larger (proportional to the offset but still bounded). } - if (input_eos_) curr_child_batch_.reset(); return Status::OK(); } @@ -786,7 +780,7 @@ Status AnalyticEvalNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* bool output_eos = false; RETURN_IF_ERROR(GetNextOutputBatch(state, row_batch, &output_eos)); - if (curr_child_batch_.get() == nullptr && output_eos) { + if (input_eos_ && output_eos) { // Transfer the ownership of all row-backing resources on eos for simplicity. // TODO: This transfer is simple and correct, but not necessarily efficient. We // should optimize the use/transfer of memory to better amortize allocations @@ -830,20 +824,17 @@ Status AnalyticEvalNode::Reset(RuntimeState* state) { DCHECK_EQ(prev_tuple_pool_->total_allocated_bytes(), 0); // Call Finalize() to clear evaluator allocations, but do not Close() them, // so we can keep evaluating them. - if (curr_tuple_ != nullptr) { - for (int i = 0; i < analytic_fn_evals_.size(); ++i) { - analytic_fn_evals_[i]->Finalize(curr_tuple_, dummy_result_tuple_); - } + if (curr_tuple_init_) { + AggFnEvaluator::Finalize(analytic_fn_evals_, curr_tuple_, dummy_result_tuple_); + curr_tuple_init_ = false; } - mem_pool_->Clear(); // The following members will be re-created in Open(). // input_stream_ should have been closed by last GetNext() call. DCHECK(input_stream_ == nullptr || input_stream_->is_closed()); input_stream_.reset(); - curr_tuple_ = nullptr; - dummy_result_tuple_ = nullptr; prev_input_tuple_ = nullptr; - curr_child_batch_.reset(); + prev_input_tuple_pool_->Clear(); + curr_child_batch_->Reset(); return ExecNode::Reset(state); } @@ -860,13 +851,11 @@ void AnalyticEvalNode::Close(RuntimeState* state) { DCHECK_LE(analytic_fn_evals_.size(), analytic_fns_.size()); DCHECK(curr_tuple_ == nullptr || analytic_fn_evals_.size() == analytic_fns_.size()); - for (int i = 0; i < analytic_fn_evals_.size(); ++i) { - // Need to make sure finalize is called in case there is any state to clean up. - if (curr_tuple_ != nullptr) { - analytic_fn_evals_[i]->Finalize(curr_tuple_, dummy_result_tuple_); - } - analytic_fn_evals_[i]->Close(state); + // Need to make sure finalize is called in case there is any state to clean up. + if (curr_tuple_init_) { + AggFnEvaluator::Finalize(analytic_fn_evals_, curr_tuple_, dummy_result_tuple_); } + AggFnEvaluator::Close(analytic_fn_evals_, state); AggFn::Close(analytic_fns_); if (partition_by_eq_expr_ != nullptr) { @@ -882,7 +871,6 @@ void AnalyticEvalNode::Close(RuntimeState* state) { if (curr_child_batch_.get() != nullptr) curr_child_batch_.reset(); if (curr_tuple_pool_.get() != nullptr) curr_tuple_pool_->FreeAll(); if (prev_tuple_pool_.get() != nullptr) prev_tuple_pool_->FreeAll(); - if (mem_pool_.get() != nullptr) mem_pool_->FreeAll(); if (prev_input_tuple_pool_.get() != nullptr) prev_input_tuple_pool_->FreeAll(); ExecNode::Close(state); } http://git-wip-us.apache.org/repos/asf/impala/blob/884fcf81/be/src/exec/analytic-eval-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h index 696969a..0c185ae 100644 --- a/be/src/exec/analytic-eval-node.h +++ b/be/src/exec/analytic-eval-node.h @@ -234,8 +234,8 @@ class AnalyticEvalNode : public ExecNode { /// Offset from the current row for ROWS windows with start or end bounds specified /// with offsets. Is positive if the offset is FOLLOWING, negative if PRECEDING, and 0 /// if type is CURRENT ROW or UNBOUNDED PRECEDING/FOLLOWING. - int64_t rows_start_offset_; - int64_t rows_end_offset_; + int64_t rows_start_offset_ = 0; + int64_t rows_end_offset_ = 0; /// Analytic functions and their evaluators. 'analytic_fns_' live in the query-state's /// objpool while the evaluators live in the exec node's objpool. @@ -248,8 +248,8 @@ class AnalyticEvalNode : public ExecNode { /// If true, evaluating FIRST_VALUE requires special null handling when initializing new /// partitions determined by the offset. Set in Open() by inspecting the agg fns. - bool has_first_val_null_offset_; - long first_val_null_offset_; + bool has_first_val_null_offset_ = false; + long first_val_null_offset_ = 0; /// Pools used to allocate result tuples (added to result_tuples_ and later returned) /// and window tuples (added to window_tuples_ to buffer the current window). Resources @@ -260,6 +260,12 @@ class AnalyticEvalNode : public ExecNode { boost::scoped_ptr<MemPool> curr_tuple_pool_; boost::scoped_ptr<MemPool> prev_tuple_pool_; + /// A tuple described by result_tuple_desc_ used when calling Finalize() on the + /// analytic_fn_evals_ to release resources between partitions; the value is never used. + /// Owned by expr_perm_pool_ and allocated in Prepare() + /// TODO: Remove when agg fns implement a separate Close() method to release resources. + Tuple* dummy_result_tuple_ = nullptr; + ///////////////////////////////////////// /// BEGIN: Members that must be Reset() @@ -278,7 +284,7 @@ class AnalyticEvalNode : public ExecNode { std::deque<std::pair<int64_t, Tuple*>> result_tuples_; /// Index in input_stream_ of the most recently added result tuple. - int64_t last_result_idx_; + int64_t last_result_idx_ = -1; /// Child tuples that are currently within the window and the index into input_stream_ /// of the row they're associated with. Only used when window start bound is PRECEDING @@ -291,28 +297,28 @@ class AnalyticEvalNode : public ExecNode { /// resources in prev_tuple_pool_. -1 when the pool is empty. Resources from /// prev_tuple_pool_ can only be transferred to an output batch once all rows containing /// these tuples have been returned. - int64_t prev_pool_last_result_idx_; + int64_t prev_pool_last_result_idx_ = -1; /// The index of the last row from input_stream_ associated with window tuples /// containing resources in prev_tuple_pool_. -1 when the pool is empty. Resources from /// prev_tuple_pool_ can only be transferred to an output batch once all rows containing /// these tuples are no longer needed (removed from the window_tuples_). - int64_t prev_pool_last_window_idx_; + int64_t prev_pool_last_window_idx_ = -1; /// The tuple described by intermediate_tuple_desc_ storing intermediate state for the /// analytic_eval_fns_. When enough input rows have been consumed to produce the /// analytic function results, a result tuple (described by result_tuple_desc_) is /// created and the agg fn results are written to that tuple by calling Finalize()/ - /// GetValue() on the evaluators with curr_tuple_ as the source tuple. + /// GetValue() on the evaluators with curr_tuple_ as the source tuple. Owned by + /// expr_perm_pool_, allocated in Prepare() and initialized in Open(). Tuple* curr_tuple_ = nullptr; - /// A tuple described by result_tuple_desc_ used when calling Finalize() on the - /// analytic_fn_evals_ to release resources between partitions; the value is never used. - /// TODO: Remove when agg fns implement a separate Close() method to release resources. - Tuple* dummy_result_tuple_ = nullptr; + /// True if AggFnEvaluator::Init() was called on 'curr_tuple_', which means that + /// AggFnEvaluator::Finalize() needs to be called on it. + bool curr_tuple_init_ = false; /// Index of the row in input_stream_ at which the current partition started. - int64_t curr_partition_idx_; + int64_t curr_partition_idx_ = -1; /// Previous input tuple used to compare partition boundaries and to determine when the /// order-by expressions change. We only need to store the first tuple of the row @@ -339,19 +345,14 @@ class AnalyticEvalNode : public ExecNode { /// TODO: Consider re-pinning unpinned streams when possible. boost::scoped_ptr<BufferedTupleStream> input_stream_; - /// Pool used for O(1) allocations that live until Close() or Reset(). - /// Does not own data backing tuples returned in GetNext(), so it does not - /// need to be transferred to an output batch. - boost::scoped_ptr<MemPool> mem_pool_; - /// True when there are no more input rows to consume from our child. - bool input_eos_; + bool input_eos_ = false; /// END: Members that must be Reset() ///////////////////////////////////////// /// Time spent processing the child rows. - RuntimeProfile::Counter* evaluation_timer_; + RuntimeProfile::Counter* evaluation_timer_ = nullptr; }; }