Repository: impala
Updated Branches:
  refs/heads/master 4ab1b9245 -> ea615d1d8


Analytic mem mgmt cleanup

Following on from IMPALA-7403, clean up some of memory management
in AnalyticEvalNode.

Remove an unnecessary MemPool and avoid reallocating tuples
unnecessarily in Reset(). Also switch to inline initialization
of members.

Testing:
Ran exhaustive tests

Change-Id: Id16beb2d0b9d4315f52dd45203649252c0eb06b0
Reviewed-on: http://gerrit.cloudera.org:8080/11203
Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/884fcf81
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/884fcf81
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/884fcf81

Branch: refs/heads/master
Commit: 884fcf81b8f0df949ee615c6b99694c01500da08
Parents: 4ab1b92
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Tue Aug 7 14:17:10 2018 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Committed: Tue Aug 14 01:37:44 2018 +0000

----------------------------------------------------------------------
 be/src/exec/analytic-eval-node.cc | 78 ++++++++++++++--------------------
 be/src/exec/analytic-eval-node.h  | 41 +++++++++---------
 2 files changed, 54 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/884fcf81/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc 
b/be/src/exec/analytic-eval-node.cc
index 68170a2..036cf93 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -48,20 +48,7 @@ AnalyticEvalNode::AnalyticEvalNode(
     intermediate_tuple_desc_(
         descs.GetTupleDescriptor(tnode.analytic_node.intermediate_tuple_id)),
     result_tuple_desc_(
-        descs.GetTupleDescriptor(tnode.analytic_node.output_tuple_id)),
-    rows_start_offset_(0),
-    rows_end_offset_(0),
-    has_first_val_null_offset_(false),
-    first_val_null_offset_(0),
-    last_result_idx_(-1),
-    prev_pool_last_result_idx_(-1),
-    prev_pool_last_window_idx_(-1),
-    curr_tuple_(nullptr),
-    dummy_result_tuple_(nullptr),
-    curr_partition_idx_(-1),
-    input_stream_(nullptr),
-    input_eos_(false),
-    evaluation_timer_(nullptr) {
+        descs.GetTupleDescriptor(tnode.analytic_node.output_tuple_id)) {
   if (tnode.analytic_node.__isset.buffered_tuple_id) {
     buffered_tuple_desc_ = descs.GetTupleDescriptor(
         tnode.analytic_node.buffered_tuple_id);
@@ -159,7 +146,6 @@ Status AnalyticEvalNode::Prepare(RuntimeState* state) {
       resource_profile_.spillable_buffer_size * MIN_REQUIRED_BUFFERS);
   curr_tuple_pool_.reset(new MemPool(mem_tracker()));
   prev_tuple_pool_.reset(new MemPool(mem_tracker()));
-  mem_pool_.reset(new MemPool(mem_tracker()));
   prev_input_tuple_pool_.reset(new MemPool(mem_tracker()));
   evaluation_timer_ = ADD_TIMER(runtime_profile(), "EvaluationTime");
 
@@ -176,6 +162,13 @@ Status AnalyticEvalNode::Prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ScalarExprEvaluator::Create(*order_by_eq_expr_, state, 
pool_,
         expr_perm_pool(), expr_results_pool(), &order_by_eq_expr_eval_));
   }
+
+  // An intermediate tuple that is only allocated once and is reused. 
'curr_tuple_' is
+  // initialized in Open() before it is used.
+  curr_tuple_ =
+      Tuple::Create(intermediate_tuple_desc_->byte_size(), 
expr_perm_pool_.get());
+  dummy_result_tuple_ =
+      Tuple::Create(result_tuple_desc_->byte_size(), expr_perm_pool_.get());
   return Status::OK();
 }
 
@@ -222,20 +215,20 @@ Status AnalyticEvalNode::Open(RuntimeState* state) {
     RETURN_IF_ERROR(order_by_eq_expr_eval_->Open(state));
   }
 
-  // An intermediate tuple is only allocated once and is reused.
-  curr_tuple_ = Tuple::Create(intermediate_tuple_desc_->byte_size(), 
mem_pool_.get());
+  // Initialize the tuple that was allocated in Prepare().
+  // TODO: zeroing out curr_tuple_ shouldn't be strictly necessary.
+  curr_tuple_->Init(intermediate_tuple_desc_->byte_size());
   AggFnEvaluator::Init(analytic_fn_evals_, curr_tuple_);
-  // Allocate dummy_result_tuple_ even if AggFnEvaluator::Init() may have 
failed
-  // as it is needed in Close().
-  // TODO: move this to Prepare()
-  dummy_result_tuple_ = Tuple::Create(result_tuple_desc_->byte_size(), 
mem_pool_.get());
+  curr_tuple_init_ = true;
   // Check for failures during AggFnEvaluator::Init().
   RETURN_IF_ERROR(state->GetQueryStatus());
 
   // Initialize state for the first partition.
   RETURN_IF_ERROR(InitNextPartition(state, 0));
-  curr_child_batch_.reset(new RowBatch(child(0)->row_desc(), 
state->batch_size(),
-      mem_tracker()));
+  if (curr_child_batch_ == nullptr) {
+    curr_child_batch_.reset(
+        new RowBatch(child(0)->row_desc(), state->batch_size(), 
mem_tracker()));
+  }
   return Status::OK();
 }
 
@@ -364,11 +357,11 @@ inline Status AnalyticEvalNode::AddRow(int64_t 
stream_idx, TupleRow* row) {
 Status AnalyticEvalNode::AddResultTuple(int64_t stream_idx) {
   VLOG_ROW << id() << " AddResultTuple idx=" << stream_idx;
   DCHECK(curr_tuple_ != nullptr);
-  MemPool* cur_tuple_pool = curr_tuple_pool_.get();
-  Tuple* result_tuple = Tuple::Create(result_tuple_desc_->byte_size(), 
cur_tuple_pool);
+  MemPool* curr_tuple_pool = curr_tuple_pool_.get();
+  Tuple* result_tuple = Tuple::Create(result_tuple_desc_->byte_size(), 
curr_tuple_pool);
 
   AggFnEvaluator::GetValue(analytic_fn_evals_, curr_tuple_, result_tuple);
-  // Copy any string data in 'result_tuple' into 'cur_tuple_pool'. The var-len 
data
+  // Copy any string data in 'result_tuple' into 'curr_tuple_pool'. The 
var-len data
   // returned by GetValue() may be backed by an allocation from
   // 'expr_results_pool_' that will be recycled so it must be copied out.
   for (const SlotDescriptor* slot_desc : result_tuple_desc_->string_slots()) {
@@ -376,9 +369,9 @@ Status AnalyticEvalNode::AddResultTuple(int64_t stream_idx) 
{
     StringValue* sv = result_tuple->GetStringSlot(slot_desc->tuple_offset());
     if (sv->len == 0) continue;
     char* new_ptr = reinterpret_cast<char*>(
-        cur_tuple_pool->TryAllocateUnaligned(sv->len));
+        curr_tuple_pool->TryAllocateUnaligned(sv->len));
     if (UNLIKELY(new_ptr == nullptr)) {
-      return cur_tuple_pool->mem_tracker()->MemLimitExceeded(nullptr,
+      return curr_tuple_pool->mem_tracker()->MemLimitExceeded(nullptr,
           "Failed to allocate memory for analytic function's result.", 
sv->len);
     }
     memcpy(new_ptr, sv->ptr, sv->len);
@@ -539,8 +532,10 @@ inline Status 
AnalyticEvalNode::InitNextPartition(RuntimeState* state,
   VLOG_ROW << id() << " Reset curr_tuple";
   // Call finalize to release resources; result is not needed but the dst 
tuple must be
   // a tuple described by result_tuple_desc_.
+  DCHECK(curr_tuple_init_);
   AggFnEvaluator::Finalize(analytic_fn_evals_, curr_tuple_, 
dummy_result_tuple_);
   // Re-initialize curr_tuple_.
+  // TODO: zeroing out curr_tuple_ shouldn't be strictly necessary.
   curr_tuple_->Init(intermediate_tuple_desc_->byte_size());
   AggFnEvaluator::Init(analytic_fn_evals_, curr_tuple_);
   // Check for errors in AggFnEvaluator::Init().
@@ -608,7 +603,6 @@ Status AnalyticEvalNode::ProcessChildBatches(RuntimeState* 
state) {
     // than 2x the batch size unless the end bound has an offset preceding, in 
which
     // case it may be slightly larger (proportional to the offset but still 
bounded).
   }
-  if (input_eos_) curr_child_batch_.reset();
   return Status::OK();
 }
 
@@ -786,7 +780,7 @@ Status AnalyticEvalNode::GetNext(RuntimeState* state, 
RowBatch* row_batch, bool*
 
   bool output_eos = false;
   RETURN_IF_ERROR(GetNextOutputBatch(state, row_batch, &output_eos));
-  if (curr_child_batch_.get() == nullptr && output_eos) {
+  if (input_eos_ && output_eos) {
     // Transfer the ownership of all row-backing resources on eos for 
simplicity.
     // TODO: This transfer is simple and correct, but not necessarily 
efficient. We
     // should optimize the use/transfer of memory to better amortize 
allocations
@@ -830,20 +824,17 @@ Status AnalyticEvalNode::Reset(RuntimeState* state) {
   DCHECK_EQ(prev_tuple_pool_->total_allocated_bytes(), 0);
   // Call Finalize() to clear evaluator allocations, but do not Close() them,
   // so we can keep evaluating them.
-  if (curr_tuple_ != nullptr) {
-    for (int i = 0; i < analytic_fn_evals_.size(); ++i) {
-      analytic_fn_evals_[i]->Finalize(curr_tuple_, dummy_result_tuple_);
-    }
+  if (curr_tuple_init_) {
+    AggFnEvaluator::Finalize(analytic_fn_evals_, curr_tuple_, 
dummy_result_tuple_);
+    curr_tuple_init_ = false;
   }
-  mem_pool_->Clear();
   // The following members will be re-created in Open().
   // input_stream_ should have been closed by last GetNext() call.
   DCHECK(input_stream_ == nullptr || input_stream_->is_closed());
   input_stream_.reset();
-  curr_tuple_ = nullptr;
-  dummy_result_tuple_ = nullptr;
   prev_input_tuple_ = nullptr;
-  curr_child_batch_.reset();
+  prev_input_tuple_pool_->Clear();
+  curr_child_batch_->Reset();
   return ExecNode::Reset(state);
 }
 
@@ -860,13 +851,11 @@ void AnalyticEvalNode::Close(RuntimeState* state) {
   DCHECK_LE(analytic_fn_evals_.size(), analytic_fns_.size());
   DCHECK(curr_tuple_ == nullptr ||
       analytic_fn_evals_.size() == analytic_fns_.size());
-  for (int i = 0; i < analytic_fn_evals_.size(); ++i) {
-    // Need to make sure finalize is called in case there is any state to 
clean up.
-    if (curr_tuple_ != nullptr)  {
-      analytic_fn_evals_[i]->Finalize(curr_tuple_, dummy_result_tuple_);
-    }
-    analytic_fn_evals_[i]->Close(state);
+  // Need to make sure finalize is called in case there is any state to clean 
up.
+  if (curr_tuple_init_)  {
+    AggFnEvaluator::Finalize(analytic_fn_evals_, curr_tuple_, 
dummy_result_tuple_);
   }
+  AggFnEvaluator::Close(analytic_fn_evals_, state);
   AggFn::Close(analytic_fns_);
 
   if (partition_by_eq_expr_ != nullptr) {
@@ -882,7 +871,6 @@ void AnalyticEvalNode::Close(RuntimeState* state) {
   if (curr_child_batch_.get() != nullptr) curr_child_batch_.reset();
   if (curr_tuple_pool_.get() != nullptr) curr_tuple_pool_->FreeAll();
   if (prev_tuple_pool_.get() != nullptr) prev_tuple_pool_->FreeAll();
-  if (mem_pool_.get() != nullptr) mem_pool_->FreeAll();
   if (prev_input_tuple_pool_.get() != nullptr) 
prev_input_tuple_pool_->FreeAll();
   ExecNode::Close(state);
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/884fcf81/be/src/exec/analytic-eval-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index 696969a..0c185ae 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -234,8 +234,8 @@ class AnalyticEvalNode : public ExecNode {
   /// Offset from the current row for ROWS windows with start or end bounds 
specified
   /// with offsets. Is positive if the offset is FOLLOWING, negative if 
PRECEDING, and 0
   /// if type is CURRENT ROW or UNBOUNDED PRECEDING/FOLLOWING.
-  int64_t rows_start_offset_;
-  int64_t rows_end_offset_;
+  int64_t rows_start_offset_ = 0;
+  int64_t rows_end_offset_ = 0;
 
   /// Analytic functions and their evaluators. 'analytic_fns_' live in the 
query-state's
   /// objpool while the evaluators live in the exec node's objpool.
@@ -248,8 +248,8 @@ class AnalyticEvalNode : public ExecNode {
 
   /// If true, evaluating FIRST_VALUE requires special null handling when 
initializing new
   /// partitions determined by the offset. Set in Open() by inspecting the agg 
fns.
-  bool has_first_val_null_offset_;
-  long first_val_null_offset_;
+  bool has_first_val_null_offset_ = false;
+  long first_val_null_offset_ = 0;
 
   /// Pools used to allocate result tuples (added to result_tuples_ and later 
returned)
   /// and window tuples (added to window_tuples_ to buffer the current 
window). Resources
@@ -260,6 +260,12 @@ class AnalyticEvalNode : public ExecNode {
   boost::scoped_ptr<MemPool> curr_tuple_pool_;
   boost::scoped_ptr<MemPool> prev_tuple_pool_;
 
+  /// A tuple described by result_tuple_desc_ used when calling Finalize() on 
the
+  /// analytic_fn_evals_ to release resources between partitions; the value is 
never used.
+  /// Owned by expr_perm_pool_ and allocated in Prepare()
+  /// TODO: Remove when agg fns implement a separate Close() method to release 
resources.
+  Tuple* dummy_result_tuple_ = nullptr;
+
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 
@@ -278,7 +284,7 @@ class AnalyticEvalNode : public ExecNode {
   std::deque<std::pair<int64_t, Tuple*>> result_tuples_;
 
   /// Index in input_stream_ of the most recently added result tuple.
-  int64_t last_result_idx_;
+  int64_t last_result_idx_ = -1;
 
   /// Child tuples that are currently within the window and the index into 
input_stream_
   /// of the row they're associated with. Only used when window start bound is 
PRECEDING
@@ -291,28 +297,28 @@ class AnalyticEvalNode : public ExecNode {
   /// resources in prev_tuple_pool_. -1 when the pool is empty. Resources from
   /// prev_tuple_pool_ can only be transferred to an output batch once all 
rows containing
   /// these tuples have been returned.
-  int64_t prev_pool_last_result_idx_;
+  int64_t prev_pool_last_result_idx_ = -1;
 
   /// The index of the last row from input_stream_ associated with window 
tuples
   /// containing resources in prev_tuple_pool_. -1 when the pool is empty. 
Resources from
   /// prev_tuple_pool_ can only be transferred to an output batch once all 
rows containing
   /// these tuples are no longer needed (removed from the window_tuples_).
-  int64_t prev_pool_last_window_idx_;
+  int64_t prev_pool_last_window_idx_ = -1;
 
   /// The tuple described by intermediate_tuple_desc_ storing intermediate 
state for the
   /// analytic_eval_fns_. When enough input rows have been consumed to produce 
the
   /// analytic function results, a result tuple (described by 
result_tuple_desc_) is
   /// created and the agg fn results are written to that tuple by calling 
Finalize()/
-  /// GetValue() on the evaluators with curr_tuple_ as the source tuple.
+  /// GetValue() on the evaluators with curr_tuple_ as the source tuple. Owned 
by
+  /// expr_perm_pool_, allocated in Prepare() and initialized in Open().
   Tuple* curr_tuple_ = nullptr;
 
-  /// A tuple described by result_tuple_desc_ used when calling Finalize() on 
the
-  /// analytic_fn_evals_ to release resources between partitions; the value is 
never used.
-  /// TODO: Remove when agg fns implement a separate Close() method to release 
resources.
-  Tuple* dummy_result_tuple_ = nullptr;
+  /// True if AggFnEvaluator::Init() was called on 'curr_tuple_', which means 
that
+  /// AggFnEvaluator::Finalize() needs to be called on it.
+  bool curr_tuple_init_ = false;
 
   /// Index of the row in input_stream_ at which the current partition started.
-  int64_t curr_partition_idx_;
+  int64_t curr_partition_idx_ = -1;
 
   /// Previous input tuple used to compare partition boundaries and to 
determine when the
   /// order-by expressions change. We only need to store the first tuple of 
the row
@@ -339,19 +345,14 @@ class AnalyticEvalNode : public ExecNode {
   /// TODO: Consider re-pinning unpinned streams when possible.
   boost::scoped_ptr<BufferedTupleStream> input_stream_;
 
-  /// Pool used for O(1) allocations that live until Close() or Reset().
-  /// Does not own data backing tuples returned in GetNext(), so it does not
-  /// need to be transferred to an output batch.
-  boost::scoped_ptr<MemPool> mem_pool_;
-
   /// True when there are no more input rows to consume from our child.
-  bool input_eos_;
+  bool input_eos_ = false;
 
   /// END: Members that must be Reset()
   /////////////////////////////////////////
 
   /// Time spent processing the child rows.
-  RuntimeProfile::Counter* evaluation_timer_;
+  RuntimeProfile::Counter* evaluation_timer_ = nullptr;
 };
 
 }

Reply via email to