IMPALA-5844: use a MemPool for expr result allocations

This is also a step towards IMPALA-2399 (remove QueryMaintenance()).

"local" allocations containing expression results (either intermediate
or final results) have the following properties:
* They are usually small allocations
* They can be made frequently (e.g. every function call)
* They are owned and managed by the Impala runtime
* They are freed in bulk at various points in query execution.

A MemPool (i.e. bump allocator) is the right mechanism to manage
allocations with the above properties. Before this patch
FunctionContext's used a FreePool + vector of allocations to emulate the
above behaviour. This patch switches to using a MemPool to bring these
allocations in line with the rest of the codebase.

The steps required to do this conversion.
* Use a MemPool for FunctionContext local allocations.
* Identify appropriate MemPools for all of the local allocations from
  function contexts so that the memory lifetime is correct.
* Various cleanup and documentation of existing MemPools.
* Replaces calls to FreeLocalAllocations() with calls to
  MemPool::Clear()

More involved surgery was required in a few places:
* Made the Sorter own its comparator, exprs and MemPool.
* Remove FunctionContextImpl::ReallocateLocal() and just have
  StringFunctions::Replace() do the doubling itself to avoid
  the need for a special interface. Worst-case this doubles
  the memory requirements for Replace() since n / 2 + n / 4
  + n / 8 + .... bytes of memory could be wasted instead of recycled
  for an n-byte output string.
* Provide a way redirect agg fn Serialize()/Finalize() allocations
  to come directly from the output RowBatch's MemPool. This is
  also potentially applicable to other places where we currently
  copy out strings from local allocations, e.g.
  AnalyticEvalNode::AddResultTuple() and Tuple::MaterializeExprs().
* --stress_free_pool_alloc was changed to instead intercept at the
  FunctionContext layer so that it retains the old behaviour even
  though allocations do not all come from FreePools.

The "local" allocation concept was not exposed directly in udf.h so this
patch also renames them to better reflect that they're used for expr
results.

Testing:
* ran exhaustive and ASAN

Change-Id: I4ba5a7542ed90a49a4b5586c040b5985a7d45b61
Reviewed-on: http://gerrit.cloudera.org:8080/8025
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c14a0904
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c14a0904
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c14a0904

Branch: refs/heads/master
Commit: c14a0904005ce150e15820c8bb9ade94fb58ed56
Parents: d8bdea5
Author: Tim Armstrong <[email protected]>
Authored: Mon Jul 24 22:51:05 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Fri Oct 6 00:01:08 2017 +0000

----------------------------------------------------------------------
 be/src/benchmarks/expr-benchmark.cc             |   3 +-
 be/src/common/global-flags.cc                   |   6 +-
 be/src/exec/analytic-eval-node.cc               |  31 ++--
 be/src/exec/analytic-eval-node.h                |   8 -
 be/src/exec/data-sink.cc                        |   8 +-
 be/src/exec/data-sink.h                         |  11 +-
 be/src/exec/exchange-node.cc                    |  10 +-
 be/src/exec/exchange-node.h                     |   1 -
 be/src/exec/exec-node.cc                        |  21 +--
 be/src/exec/exec-node.h                         |  43 ++---
 be/src/exec/filter-context.cc                   |   5 +-
 be/src/exec/filter-context.h                    |   2 +-
 be/src/exec/hash-table-test.cc                  |  24 ++-
 be/src/exec/hash-table.cc                       |  38 ++---
 be/src/exec/hash-table.h                        |  40 +++--
 be/src/exec/hbase-table-sink.cc                 |   2 +-
 be/src/exec/hdfs-parquet-scanner.cc             |  30 ++--
 be/src/exec/hdfs-parquet-scanner.h              |   5 -
 be/src/exec/hdfs-scan-node-base.cc              |   4 +-
 be/src/exec/hdfs-scan-node-mt.cc                |   3 +-
 be/src/exec/hdfs-scan-node.cc                   |  15 +-
 be/src/exec/hdfs-scan-node.h                    |   2 +-
 be/src/exec/hdfs-scanner.cc                     |  13 +-
 be/src/exec/hdfs-scanner.h                      |   8 +-
 be/src/exec/hdfs-table-sink.cc                  |   6 +-
 be/src/exec/kudu-scanner.cc                     |  12 +-
 be/src/exec/kudu-scanner.h                      |  11 +-
 be/src/exec/kudu-table-sink.cc                  |   2 +-
 be/src/exec/nested-loop-join-node.cc            |   4 +-
 be/src/exec/partial-sort-node.cc                |  18 +-
 be/src/exec/partial-sort-node.h                 |   4 -
 be/src/exec/partitioned-aggregation-node.cc     | 128 +++++---------
 be/src/exec/partitioned-aggregation-node.h      |  41 ++---
 be/src/exec/partitioned-hash-join-builder.cc    |  30 ++--
 be/src/exec/partitioned-hash-join-builder.h     |   5 +-
 be/src/exec/partitioned-hash-join-node.cc       |  43 ++---
 be/src/exec/partitioned-hash-join-node.h        |   6 +-
 be/src/exec/plan-root-sink.cc                   |   3 +-
 be/src/exec/scan-node.cc                        |   3 +-
 be/src/exec/scanner-context.cc                  |   5 +-
 be/src/exec/scanner-context.h                   |  17 +-
 be/src/exec/sort-node.cc                        |  18 +-
 be/src/exec/sort-node.h                         |   4 -
 be/src/exec/topn-node.cc                        |  11 +-
 be/src/exec/topn-node.h                         |   1 -
 be/src/exec/union-node.cc                       |   6 +-
 be/src/exec/unnest-node.cc                      |   2 +-
 be/src/exprs/agg-fn-evaluator.cc                |  59 ++++---
 be/src/exprs/agg-fn-evaluator.h                 | 121 ++++++++-----
 be/src/exprs/case-expr.cc                       |   4 +-
 be/src/exprs/cast-functions-ir.cc               |   2 +-
 be/src/exprs/expr-test.cc                       |  12 +-
 be/src/exprs/hive-udf-call.cc                   |   4 +-
 be/src/exprs/scalar-expr-evaluator.cc           |  84 ++++-----
 be/src/exprs/scalar-expr-evaluator.h            |  69 ++++----
 be/src/exprs/scalar-fn-call.cc                  |   2 +-
 be/src/exprs/string-functions-ir.cc             |  35 ++--
 be/src/runtime/CMakeLists.txt                   |   1 -
 be/src/runtime/data-stream-sender.cc            |   5 +-
 be/src/runtime/data-stream-test.cc              |   3 +-
 be/src/runtime/descriptors.cc                   |   2 +-
 be/src/runtime/free-pool.cc                     |  28 ---
 be/src/runtime/free-pool.h                      |  20 ---
 be/src/runtime/sorted-run-merger.cc             |   3 -
 be/src/runtime/sorter.cc                        |  68 +++++---
 be/src/runtime/sorter.h                         |  38 +++--
 be/src/runtime/tuple.cc                         |   3 +-
 be/src/service/fe-support.cc                    |   2 +-
 be/src/udf/udf-internal.h                       |  72 ++++----
 be/src/udf/udf-test-harness.cc                  |   3 +-
 be/src/udf/udf.cc                               | 169 ++++++++++---------
 be/src/udf/udf.h                                |  34 ++--
 be/src/util/tuple-row-compare.cc                |  10 +-
 be/src/util/tuple-row-compare.h                 |  16 +-
 .../queries/QueryTest/alloc-fail-init.test      |   6 +-
 tests/custom_cluster/test_alloc_fail.py         |   4 +-
 76 files changed, 753 insertions(+), 839 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/benchmarks/expr-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/expr-benchmark.cc 
b/be/src/benchmarks/expr-benchmark.cc
index 093d2b1..f1208f8 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -113,7 +113,8 @@ static Status PrepareSelectList(
   RuntimeState* state = planner->GetRuntimeState();
   ScalarExpr* expr;
   RETURN_IF_ERROR(ScalarExpr::Create(texprs[0], RowDescriptor(), state, 
&expr));
-  RETURN_IF_ERROR(ScalarExprEvaluator::Create(*expr, state, &pool, &mem_pool, 
eval));
+  RETURN_IF_ERROR(
+      ScalarExprEvaluator::Create(*expr, state, &pool, &mem_pool, &mem_pool, 
eval));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 1a8b027..5d3879f 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -120,9 +120,9 @@ DEFINE_bool(load_auth_to_local_rules, false, "If true, load 
auth_to_local config
 
 // Stress options that are only enabled in debug builds for testing.
 #ifndef NDEBUG
-DEFINE_int32(stress_free_pool_alloc, 0, "A stress option which causes memory 
allocations "
-    "to fail once every n allocations where n is the value of this flag. 
Effective in "
-    "debug builds only.");
+DEFINE_int32(stress_fn_ctx_alloc, 0, "A stress option which causes memory 
allocations "
+    "in function contexts to fail once every n allocations where n is the 
value of this "
+    "flag. Effective in debug builds only.");
 DEFINE_int32(stress_datastream_recvr_delay_ms, 0, "A stress option that causes 
data "
     "stream receiver registration to be delayed. Effective in debug builds 
only.");
 DEFINE_bool(skip_file_runtime_filtering, false, "Skips file-based runtime 
filtering in "

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc 
b/be/src/exec/analytic-eval-node.cc
index a3551f3..bc22d82 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -161,23 +161,20 @@ Status AnalyticEvalNode::Prepare(RuntimeState* state) {
   curr_tuple_pool_.reset(new MemPool(mem_tracker()));
   prev_tuple_pool_.reset(new MemPool(mem_tracker()));
   mem_pool_.reset(new MemPool(mem_tracker()));
-  fn_pool_.reset(new MemPool(expr_mem_tracker()));
   evaluation_timer_ = ADD_TIMER(runtime_profile(), "EvaluationTime");
 
   DCHECK_EQ(result_tuple_desc_->slots().size(), analytic_fns_.size());
-  RETURN_IF_ERROR(AggFnEvaluator::Create(analytic_fns_, state, pool_, 
fn_pool_.get(),
-      &analytic_fn_evals_));
+  RETURN_IF_ERROR(AggFnEvaluator::Create(analytic_fns_, state, pool_, 
expr_perm_pool(),
+      expr_results_pool(), &analytic_fn_evals_));
 
   if (partition_by_eq_expr_ != nullptr) {
     RETURN_IF_ERROR(ScalarExprEvaluator::Create(*partition_by_eq_expr_, state, 
pool_,
-        fn_pool_.get(), &partition_by_eq_expr_eval_));
-    AddEvaluatorToFree(partition_by_eq_expr_eval_);
+        expr_perm_pool(), expr_results_pool(), &partition_by_eq_expr_eval_));
   }
 
   if (order_by_eq_expr_ != nullptr) {
     RETURN_IF_ERROR(ScalarExprEvaluator::Create(*order_by_eq_expr_, state, 
pool_,
-        fn_pool_.get(), &order_by_eq_expr_eval_));
-    AddEvaluatorToFree(order_by_eq_expr_eval_);
+        expr_perm_pool(), expr_results_pool(), &order_by_eq_expr_eval_));
   }
   return Status::OK();
 }
@@ -382,12 +379,13 @@ Status AnalyticEvalNode::AddResultTuple(int64_t 
stream_idx) {
   Tuple* result_tuple = Tuple::Create(result_tuple_desc_->byte_size(), 
cur_tuple_pool);
 
   AggFnEvaluator::GetValue(analytic_fn_evals_, curr_tuple_, result_tuple);
-  // Copy any string data in 'result_tuple' into 'cur_tuple_pool_'.
-  for (const SlotDescriptor* slot_desc : result_tuple_desc_->slots()) {
-    if (!slot_desc->type().IsVarLenStringType()) continue;
-    StringValue* sv = reinterpret_cast<StringValue*>(
-        result_tuple->GetSlot(slot_desc->tuple_offset()));
-    if (sv == nullptr || sv->len == 0) continue;
+  // Copy any string data in 'result_tuple' into 'cur_tuple_pool'. The var-len 
data
+  // returned by GetValue() may be backed by an allocation from
+  // 'expr_results_pool_' that will be recycled so it must be copied out.
+  for (const SlotDescriptor* slot_desc : result_tuple_desc_->string_slots()) {
+    if (result_tuple->IsNull(slot_desc->null_indicator_offset())) continue;
+    StringValue* sv = result_tuple->GetStringSlot(slot_desc->tuple_offset());
+    if (sv->len == 0) continue;
     char* new_ptr = reinterpret_cast<char*>(
         cur_tuple_pool->TryAllocateUnaligned(sv->len));
     if (UNLIKELY(new_ptr == nullptr)) {
@@ -889,7 +887,6 @@ void AnalyticEvalNode::Close(RuntimeState* state) {
   if (curr_tuple_pool_.get() != nullptr) curr_tuple_pool_->FreeAll();
   if (prev_tuple_pool_.get() != nullptr) prev_tuple_pool_->FreeAll();
   if (mem_pool_.get() != nullptr) mem_pool_->FreeAll();
-  if (fn_pool_.get() != nullptr) fn_pool_->FreeAll();
   ExecNode::Close(state);
 }
 
@@ -907,10 +904,4 @@ void AnalyticEvalNode::DebugString(int indentation_level, 
stringstream* out) con
   ExecNode::DebugString(indentation_level, out);
   *out << ")";
 }
-
-Status AnalyticEvalNode::QueryMaintenance(RuntimeState* state) {
-  AggFnEvaluator::FreeLocalAllocations(analytic_fn_evals_);
-  return ExecNode::QueryMaintenance(state);
-}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/analytic-eval-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index 671eaa4..c7e7873 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -71,9 +71,6 @@ class AnalyticEvalNode : public ExecNode {
   virtual void Close(RuntimeState* state);
 
  protected:
-  /// Frees local allocations from analytic_fn_evals_
-  virtual Status QueryMaintenance(RuntimeState* state);
-
   virtual void DebugString(int indentation_level, std::stringstream* out) 
const;
 
  private:
@@ -243,11 +240,6 @@ class AnalyticEvalNode : public ExecNode {
   bool has_first_val_null_offset_;
   long first_val_null_offset_;
 
-  /// Mem pool backing allocations from fn_ctxs_. This pool must not be 
Reset() because
-  /// the memory is managed by the FreePools of the function contexts which do 
their own
-  /// bookkeeping using a pointer-based structure stored in the memory blocks 
themselves.
-  boost::scoped_ptr<MemPool> fn_pool_;
-
   /// Pools used to allocate result tuples (added to result_tuples_ and later 
returned)
   /// and window tuples (added to window_tuples_ to buffer the current 
window). Resources
   /// are transferred from curr_tuple_pool_ to prev_tuple_pool_ once it is at 
least

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index c173ed5..a319860 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -180,9 +180,10 @@ Status DataSink::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker) {
   mem_tracker_.reset(new MemTracker(profile_, -1, name, parent_mem_tracker));
   expr_mem_tracker_.reset(
       new MemTracker(-1, Substitute("$0 Exprs", name), mem_tracker_.get(), 
false));
-  expr_mem_pool_.reset(new MemPool(expr_mem_tracker_.get()));
+  expr_perm_pool_.reset(new MemPool(expr_mem_tracker_.get()));
+  expr_results_pool_.reset(new MemPool(expr_mem_tracker_.get()));
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(output_exprs_, state, 
state->obj_pool(),
-      expr_mem_pool(), &output_expr_evals_));
+      expr_perm_pool_.get(), expr_results_pool_.get(), &output_expr_evals_));
   return Status::OK();
 }
 
@@ -195,7 +196,8 @@ void DataSink::Close(RuntimeState* state) {
   if (closed_) return;
   ScalarExprEvaluator::Close(output_expr_evals_, state);
   ScalarExpr::Close(output_exprs_);
-  if (expr_mem_pool() != nullptr) expr_mem_pool_->FreeAll();
+  if (expr_perm_pool_ != nullptr) expr_perm_pool_->FreeAll();
+  if (expr_results_pool_.get() != nullptr) expr_results_pool_->FreeAll();
   if (expr_mem_tracker_ != nullptr) expr_mem_tracker_->Close();
   if (mem_tracker_ != nullptr) mem_tracker_->Close();
   closed_ = true;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/data-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index 8e870f9..448eb8a 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -99,7 +99,6 @@ class DataSink {
 
   MemTracker* mem_tracker() const { return mem_tracker_.get(); }
   RuntimeProfile* profile() const { return profile_; }
-  MemPool* expr_mem_pool() const { return expr_mem_pool_.get(); }
   const std::vector<ScalarExprEvaluator*>& output_expr_evals() const {
     return output_expr_evals_;
   }
@@ -122,8 +121,14 @@ class DataSink {
   /// A child of 'mem_tracker_' that tracks expr allocations. Initialized in 
Prepare().
   boost::scoped_ptr<MemTracker> expr_mem_tracker_;
 
-  /// MemPool for backing data structures in expressions and their evaluators.
-  boost::scoped_ptr<MemPool> expr_mem_pool_;
+  /// MemPool for allocations made by expression evaluators in this sink that 
are
+  /// "permanent" and live until Close() is called.
+  boost::scoped_ptr<MemPool> expr_perm_pool_;
+
+  /// MemPool for allocations made by expression evaluators in this sink that 
hold
+  /// intermediate or final results of expression evaluation. Should be cleared
+  /// periodically to free accumulated memory.
+  boost::scoped_ptr<MemPool> expr_results_pool_;
 
   /// Output expressions to convert row batches onto output values.
   /// Not used in some sub-classes.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index a0c002a..b39bcbf 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -109,7 +109,8 @@ Status ExchangeNode::Open(RuntimeState* state) {
   if (is_merging_) {
     // CreateMerger() will populate its merging heap with batches from the 
stream_recvr_,
     // so it is not necessary to call FillInputRowBatch().
-    RETURN_IF_ERROR(less_than_->Open(pool_, state, expr_mem_pool()));
+    RETURN_IF_ERROR(
+        less_than_->Open(pool_, state, expr_perm_pool(), expr_results_pool()));
     RETURN_IF_ERROR(stream_recvr_->CreateMerger(*less_than_.get()));
   } else {
     RETURN_IF_ERROR(FillInputRowBatch(state));
@@ -130,11 +131,6 @@ void ExchangeNode::Close(RuntimeState* state) {
   ExecNode::Close(state);
 }
 
-Status ExchangeNode::QueryMaintenance(RuntimeState* state) {
-  if (less_than_.get() != nullptr) less_than_->FreeLocalAllocations();
-  return ExecNode::QueryMaintenance(state);
-}
-
 Status ExchangeNode::FillInputRowBatch(RuntimeState* state) {
   DCHECK(!is_merging_);
   Status ret_status;
@@ -208,6 +204,8 @@ Status ExchangeNode::GetNextMerging(RuntimeState* state, 
RowBatch* output_batch,
   DCHECK_EQ(output_batch->num_rows(), 0);
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
+  // Clear any expr result allocations made by the merger.
+  expr_results_pool_->Clear();
   RETURN_IF_ERROR(stream_recvr_->GetNext(output_batch, eos));
 
   while (num_rows_skipped_ < offset_) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/exchange-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.h b/be/src/exec/exchange-node.h
index 4bb6ce3..aaf44c2 100644
--- a/be/src/exec/exchange-node.h
+++ b/be/src/exec/exchange-node.h
@@ -58,7 +58,6 @@ class ExchangeNode : public ExecNode {
   void set_num_senders(int num_senders) { num_senders_ = num_senders; }
 
  protected:
-  virtual Status QueryMaintenance(RuntimeState* state);
   virtual void DebugString(int indentation_level, std::stringstream* out) 
const;
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index b656d2b..aaca8be 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -155,16 +155,16 @@ Status ExecNode::Prepare(RuntimeState* state) {
   mem_tracker_.reset(new MemTracker(runtime_profile_, -1, 
runtime_profile_->name(),
       state->instance_mem_tracker()));
   expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker_.get(), 
false));
-  expr_mem_pool_.reset(new MemPool(expr_mem_tracker_.get()));
+  expr_perm_pool_.reset(new MemPool(expr_mem_tracker_.get()));
+  expr_results_pool_.reset(new MemPool(expr_mem_tracker_.get()));
   rows_returned_counter_ = ADD_COUNTER(runtime_profile_, "RowsReturned", 
TUnit::UNIT);
   rows_returned_rate_ = runtime_profile()->AddDerivedCounter(
       ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
       bind<int64_t>(&RuntimeProfile::UnitsPerSecond, rows_returned_counter_,
           runtime_profile()->total_time_counter()));
-  RETURN_IF_ERROR(ScalarExprEvaluator::Create(conjuncts_, state, pool_, 
expr_mem_pool(),
-      &conjunct_evals_));
+  RETURN_IF_ERROR(ScalarExprEvaluator::Create(conjuncts_, state, pool_, 
expr_perm_pool(),
+      expr_results_pool(), &conjunct_evals_));
   DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size());
-  AddEvaluatorsToFree(conjunct_evals_);
   for (int i = 0; i < children_.size(); ++i) {
     RETURN_IF_ERROR(children_[i]->Prepare(state));
   }
@@ -206,7 +206,8 @@ void ExecNode::Close(RuntimeState* state) {
 
   ScalarExprEvaluator::Close(conjunct_evals_, state);
   ScalarExpr::Close(conjuncts_);
-  if (expr_mem_pool() != nullptr) expr_mem_pool_->FreeAll();
+  if (expr_perm_pool() != nullptr) expr_perm_pool_->FreeAll();
+  if (expr_results_pool() != nullptr) expr_results_pool_->FreeAll();
   if (buffer_pool_client_.is_registered()) {
     VLOG_FILE << id_ << " returning reservation " << 
resource_profile_.min_reservation;
     state->query_state()->initial_reservations()->Return(
@@ -501,18 +502,10 @@ bool ExecNode::EvalConjuncts(
 }
 
 Status ExecNode::QueryMaintenance(RuntimeState* state) {
-  ScalarExprEvaluator::FreeLocalAllocations(evals_to_free_);
+  expr_results_pool_->Clear();
   return state->CheckQueryState();
 }
 
-void ExecNode::AddEvaluatorToFree(ScalarExprEvaluator* eval) {
-  evals_to_free_.push_back(eval);
-}
-
-void ExecNode::AddEvaluatorsToFree(const vector<ScalarExprEvaluator*>& evals) {
-  for (ScalarExprEvaluator* eval : evals) AddEvaluatorToFree(eval);
-}
-
 void ExecNode::AddCodegenDisabledMessage(RuntimeState* state) {
   if (state->CodegenDisabledByQueryOption()) {
     runtime_profile()->AddCodegenMsg(false, "disabled by query option 
DISABLE_CODEGEN");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 7cba6ac..a0ed352 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -211,7 +211,8 @@ class ExecNode {
   RuntimeProfile* runtime_profile() { return runtime_profile_; }
   MemTracker* mem_tracker() { return mem_tracker_.get(); }
   MemTracker* expr_mem_tracker() { return expr_mem_tracker_.get(); }
-  MemPool* expr_mem_pool() { return expr_mem_pool_.get(); }
+  MemPool* expr_perm_pool() { return expr_perm_pool_.get(); }
+  MemPool* expr_results_pool() { return expr_results_pool_.get(); }
 
   /// Return true if codegen was disabled by the planner for this ExecNode. 
Does not
   /// check to see if codegen was enabled for the enclosing fragment.
@@ -318,12 +319,19 @@ class ExecNode {
   /// Account for peak memory used by this node
   boost::scoped_ptr<MemTracker> mem_tracker_;
 
-  /// MemTracker used by 'expr_mem_pool_'.
+  /// MemTracker used by 'expr_perm_pool_' and 'expr_results_pool_'.
   boost::scoped_ptr<MemTracker> expr_mem_tracker_;
 
-  /// MemPool for allocating data structures used by expression evaluators in 
this node.
-  /// Created in Prepare().
-  boost::scoped_ptr<MemPool> expr_mem_pool_;
+  /// MemPool for allocations made by expression evaluators in this node that 
are
+  /// "permanent" and live until Close() is called. Created in Prepare().
+  boost::scoped_ptr<MemPool> expr_perm_pool_;
+
+  /// MemPool for allocations made by expression evaluators in this node that 
hold
+  /// intermediate or final results of expression evaluation. Should be cleared
+  /// periodically to free accumulated memory. QueryMaintenance() clears this 
pool, but
+  /// it may be appropriate for ExecNode implementation to clear it at other 
points in
+  /// execution where the memory is not needed.
+  boost::scoped_ptr<MemPool> expr_results_pool_;
 
   /// Buffer pool client for this node. Initialized with the node's minimum 
reservation
   /// in ClaimBufferReservation(). After initialization, the client must hold 
onto at
@@ -365,22 +373,11 @@ class ExecNode {
     return ExecDebugActionImpl(phase, state);
   }
 
-  /// Frees any local allocations made by evals_to_free_ and returns the 
result of
-  /// state->CheckQueryState(). Nodes should call this periodically, e.g. once 
per input
-  /// row batch. This should not be called outside the main execution thread.
-  //
-  /// Nodes may override this to add extra periodic cleanup, e.g. freeing 
other local
-  /// allocations. ExecNodes overriding this function should return
-  /// ExecNode::QueryMaintenance().
-  virtual Status QueryMaintenance(RuntimeState* state) WARN_UNUSED_RESULT;
-
-  /// Add an expr evaluator to have its local allocations freed by 
QueryMaintenance().
-  /// Exprs that are evaluated in the main execution thread should be added. 
Exprs
-  /// evaluated in a separate thread are generally not safe to add, since a 
local
-  /// allocation may be freed while it's being used. Rather than using this 
mechanism,
-  /// threads should call FreeLocalAllocations() on local evaluators 
periodically.
-  void AddEvaluatorToFree(ScalarExprEvaluator* eval);
-  void AddEvaluatorsToFree(const std::vector<ScalarExprEvaluator*>& evals);
+  /// Clears 'expr_results_pool_' and returns the result of 
state->CheckQueryState().
+  /// Nodes should call this periodically, e.g. once per input row batch. This 
should
+  /// not be called outside the main execution thread.
+  /// TODO: IMPALA-2399: replace QueryMaintenance() - see JIRA for more 
details.
+  Status QueryMaintenance(RuntimeState* state) WARN_UNUSED_RESULT;
 
  private:
   /// Implementation of ExecDebugAction(). This is the slow path we take when 
there is
@@ -391,10 +388,6 @@ class ExecNode {
   /// Set in ExecNode::Close(). Used to make Close() idempotent. This is not 
protected
   /// by a lock, it assumes all calls to Close() are made by the same thread.
   bool is_closed_;
-
-  /// Expr evaluators whose local allocations are safe to free in the main 
execution
-  /// thread.
-  std::vector<ScalarExprEvaluator*> evals_to_free_;
 };
 
 inline bool ExecNode::EvalPredicate(ScalarExprEvaluator* eval, TupleRow* row) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/filter-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/filter-context.cc b/be/src/exec/filter-context.cc
index ecf744a..a882365 100644
--- a/be/src/exec/filter-context.cc
+++ b/be/src/exec/filter-context.cc
@@ -68,10 +68,11 @@ void FilterStats::RegisterCounterGroup(const string& key) {
 }
 
 Status FilterContext::CloneFrom(const FilterContext& from, ObjectPool* pool,
-    RuntimeState* state, MemPool* mem_pool) {
+    RuntimeState* state, MemPool* expr_perm_pool, MemPool* expr_results_pool) {
   filter = from.filter;
   stats = from.stats;
-  return from.expr_eval->Clone(pool, state, mem_pool, &expr_eval);
+  return from.expr_eval->Clone(
+      pool, state, expr_perm_pool, expr_results_pool, &expr_eval);
 }
 
 bool FilterContext::Eval(TupleRow* row) const noexcept {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/filter-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/filter-context.h b/be/src/exec/filter-context.h
index 81a3889..5c232f8 100644
--- a/be/src/exec/filter-context.h
+++ b/be/src/exec/filter-context.h
@@ -100,7 +100,7 @@ struct FilterContext {
   /// Clones this FilterContext for use in a multi-threaded context (i.e. by 
scanner
   /// threads).
   Status CloneFrom(const FilterContext& from, ObjectPool* pool, RuntimeState* 
state,
-      MemPool* mem_pool);
+      MemPool* expr_perm_pool, MemPool* expr_results_pool);
 
   /// Evaluates 'row' with 'expr_eval' with the resulting value being checked
   /// against runtime filter 'filter' for matches. Returns true if 'row' finds

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index 0b99cbd..c7dbfe2 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -84,14 +84,14 @@ class HashTableTest : public testing::Test {
     ASSERT_OK(build_expr->Init(desc, nullptr));
     build_exprs_.push_back(build_expr);
     ASSERT_OK(ScalarExprEvaluator::Create(build_exprs_, nullptr, &pool_, 
&mem_pool_,
-        &build_expr_evals_));
+        &mem_pool_, &build_expr_evals_));
     ASSERT_OK(ScalarExprEvaluator::Open(build_expr_evals_, nullptr));
 
     ScalarExpr* probe_expr = pool_.Add(new SlotRef(TYPE_INT, 1, true /* 
nullable */));
     ASSERT_OK(probe_expr->Init(desc, nullptr));
     probe_exprs_.push_back(probe_expr);
     ASSERT_OK(ScalarExprEvaluator::Create(probe_exprs_, nullptr, &pool_, 
&mem_pool_,
-        &probe_expr_evals_));
+        &mem_pool_, &probe_expr_evals_));
     ASSERT_OK(ScalarExprEvaluator::Open(probe_expr_evals_, nullptr));
 
     CreateTestEnv();
@@ -304,7 +304,8 @@ class HashTableTest : public testing::Test {
     scoped_ptr<HashTableCtx> ht_ctx;
     EXPECT_OK(HashTableCtx::Create(&pool_, runtime_state_,
         build_exprs_, probe_exprs_, true /* stores_nulls_ */,
-        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, 
&ht_ctx));
+        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, 
&mem_pool_,
+        &mem_pool_, &ht_ctx));
     EXPECT_OK(ht_ctx->Open(runtime_state_));
 
     for (int i = 0; i < 2; ++i) {
@@ -342,7 +343,8 @@ class HashTableTest : public testing::Test {
     scoped_ptr<HashTableCtx> ht_ctx;
     Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
         probe_exprs_, false /* !stores_nulls_ */,
-        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, 
&ht_ctx);
+        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, 
&mem_pool_,
+        &mem_pool_, &ht_ctx);
     EXPECT_OK(status);
     EXPECT_OK(ht_ctx->Open(runtime_state_));
     bool success;
@@ -402,7 +404,8 @@ class HashTableTest : public testing::Test {
     scoped_ptr<HashTableCtx> ht_ctx;
     Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
         probe_exprs_, false /* !stores_nulls_ */,
-        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, 
&ht_ctx);
+        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, 
&mem_pool_,
+        &mem_pool_, &ht_ctx);
     EXPECT_OK(status);
     EXPECT_OK(ht_ctx->Open(runtime_state_));
 
@@ -466,7 +469,8 @@ class HashTableTest : public testing::Test {
     scoped_ptr<HashTableCtx> ht_ctx;
     Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
         probe_exprs_, false /* !stores_nulls_ */,
-        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, 
&ht_ctx);
+        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, 
&mem_pool_,
+        &mem_pool_, &ht_ctx);
     EXPECT_OK(status);
 
     // Inserts num_to_add + (num_to_add^2) + (num_to_add^4) + ... + 
(num_to_add^20)
@@ -553,7 +557,8 @@ class HashTableTest : public testing::Test {
     scoped_ptr<HashTableCtx> ht_ctx;
     Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
         probe_exprs_, false /* !stores_nulls_ */,
-        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, 
&ht_ctx);
+        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_,
+        &mem_pool_, &mem_pool_, &ht_ctx);
     EXPECT_OK(status);
 
     // Insert and probe table_size different tuples. All of them are expected 
to be
@@ -626,7 +631,7 @@ class HashTableTest : public testing::Test {
     scoped_ptr<HashTableCtx> ht_ctx;
     Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
         probe_exprs_, false /* !stores_nulls_ */, 
vector<bool>(build_exprs_.size(), false), 1, 0, 1,
-        &mem_pool_, &ht_ctx);
+        &mem_pool_, &mem_pool_, &mem_pool_, &ht_ctx);
     EXPECT_OK(status);
     HashTable::Iterator iter = hash_table->Begin(ht_ctx.get());
     EXPECT_TRUE(iter.AtEnd());
@@ -712,7 +717,8 @@ TEST_F(HashTableTest, HashEmpty) {
   scoped_ptr<HashTableCtx> ht_ctx;
   Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
       probe_exprs_, false /* !stores_nulls_ */,
-      vector<bool>(build_exprs_.size(), false), 1, 2, 1, &mem_pool_, &ht_ctx);
+      vector<bool>(build_exprs_.size(), false), 1, 2, 1, &mem_pool_, 
&mem_pool_,
+      &mem_pool_, &ht_ctx);
   EXPECT_OK(status);
   EXPECT_OK(ht_ctx->Open(runtime_state_));
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index c9090b9..a1d9c57 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -97,7 +97,8 @@ static_assert(sizeof(NULL_VALUE) >= 
ColumnType::MAX_CHAR_LENGTH,
 HashTableCtx::HashTableCtx(const std::vector<ScalarExpr*>& build_exprs,
     const std::vector<ScalarExpr*>& probe_exprs, bool stores_nulls,
     const std::vector<bool>& finds_nulls, int32_t initial_seed,
-    int max_levels, MemPool* mem_pool)
+    int max_levels, MemPool* expr_perm_pool, MemPool* build_expr_results_pool,
+    MemPool* probe_expr_results_pool)
     : build_exprs_(build_exprs),
       probe_exprs_(probe_exprs),
       stores_nulls_(stores_nulls),
@@ -106,7 +107,9 @@ HashTableCtx::HashTableCtx(const std::vector<ScalarExpr*>& 
build_exprs,
           finds_nulls_.begin(), finds_nulls_.end(), false, 
std::logical_or<bool>())),
       level_(0),
       scratch_row_(NULL),
-      mem_pool_(mem_pool) {
+      expr_perm_pool_(expr_perm_pool),
+      build_expr_results_pool_(build_expr_results_pool),
+      probe_expr_results_pool_(probe_expr_results_pool) {
   DCHECK(!finds_some_nulls_ || stores_nulls_);
   // Compute the layout and buffer size to store the evaluated expr results
   DCHECK_EQ(build_exprs_.size(), probe_exprs_.size());
@@ -131,22 +134,24 @@ Status HashTableCtx::Init(ObjectPool* pool, RuntimeState* 
state, int num_build_t
     return Status(Substitute("Failed to allocate $0 bytes for scratch row of "
         "HashTableCtx.", scratch_row_size));
   }
-  RETURN_IF_ERROR(ScalarExprEvaluator::Create(build_exprs_, state, pool, 
mem_pool_,
-      &build_expr_evals_));
+  RETURN_IF_ERROR(ScalarExprEvaluator::Create(build_exprs_, state, pool, 
expr_perm_pool_,
+      build_expr_results_pool_, &build_expr_evals_));
   DCHECK_EQ(build_exprs_.size(), build_expr_evals_.size());
-  RETURN_IF_ERROR(ScalarExprEvaluator::Create(probe_exprs_, state, pool, 
mem_pool_,
-      &probe_expr_evals_));
+  RETURN_IF_ERROR(ScalarExprEvaluator::Create(probe_exprs_, state, pool, 
expr_perm_pool_,
+      probe_expr_results_pool_, &probe_expr_evals_));
   DCHECK_EQ(probe_exprs_.size(), probe_expr_evals_.size());
-  return expr_values_cache_.Init(state, mem_pool_->mem_tracker(), 
build_exprs_);
+  return expr_values_cache_.Init(state, expr_perm_pool_->mem_tracker(), 
build_exprs_);
 }
 
 Status HashTableCtx::Create(ObjectPool* pool, RuntimeState* state,
     const std::vector<ScalarExpr*>& build_exprs,
     const std::vector<ScalarExpr*>& probe_exprs, bool stores_nulls,
     const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels,
-    int num_build_tuples, MemPool* mem_pool, scoped_ptr<HashTableCtx>* ht_ctx) 
{
+    int num_build_tuples, MemPool* expr_perm_pool, MemPool* 
build_expr_results_pool,
+    MemPool* probe_expr_results_pool, scoped_ptr<HashTableCtx>* ht_ctx) {
   ht_ctx->reset(new HashTableCtx(build_exprs, probe_exprs, stores_nulls,
-      finds_nulls, initial_seed, max_levels, mem_pool));
+      finds_nulls, initial_seed, max_levels, expr_perm_pool,
+      build_expr_results_pool, probe_expr_results_pool));
   return (*ht_ctx)->Init(pool, state, num_build_tuples);
 }
 
@@ -159,24 +164,11 @@ Status HashTableCtx::Open(RuntimeState* state) {
 void HashTableCtx::Close(RuntimeState* state) {
   free(scratch_row_);
   scratch_row_ = NULL;
-  expr_values_cache_.Close(mem_pool_->mem_tracker());
+  expr_values_cache_.Close(expr_perm_pool_->mem_tracker());
   ScalarExprEvaluator::Close(build_expr_evals_, state);
   ScalarExprEvaluator::Close(probe_expr_evals_, state);
 }
 
-void HashTableCtx::FreeBuildLocalAllocations() {
-  ScalarExprEvaluator::FreeLocalAllocations(build_expr_evals_);
-}
-
-void HashTableCtx::FreeProbeLocalAllocations() {
-  ScalarExprEvaluator::FreeLocalAllocations(probe_expr_evals_);
-}
-
-void HashTableCtx::FreeLocalAllocations() {
-  FreeBuildLocalAllocations();
-  FreeProbeLocalAllocations();
-}
-
 uint32_t HashTableCtx::Hash(const void* input, int len, uint32_t hash) const {
   /// Use CRC hash at first level for better performance. Switch to murmur 
hash at
   /// subsequent levels since CRC doesn't randomize well with different seed 
inputs.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index d764640..91fb968 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -118,7 +118,8 @@ class HashTableCtx {
       const std::vector<ScalarExpr*>& build_exprs,
       const std::vector<ScalarExpr*>& probe_exprs, bool stores_nulls,
       const std::vector<bool>& finds_nulls, int32_t initial_seed, int 
max_levels,
-      int num_build_tuples, MemPool* mem_pool, 
boost::scoped_ptr<HashTableCtx>* ht_ctx);
+      int num_build_tuples, MemPool* expr_perm_pool, MemPool* 
build_expr_results_pool,
+      MemPool* probe_expr_results_pool, boost::scoped_ptr<HashTableCtx>* 
ht_ctx);
 
   /// Initialize the build and probe expression evaluators.
   Status Open(RuntimeState* state);
@@ -126,13 +127,6 @@ class HashTableCtx {
   /// Call to cleanup any resources allocated by the expression evaluators.
   void Close(RuntimeState* state);
 
-  /// Free local allocations made by build and probe expression evaluators 
respectively.
-  void FreeBuildLocalAllocations();
-  void FreeProbeLocalAllocations();
-
-  /// Free local allocations of both build and probe expression evaluators.
-  void FreeLocalAllocations();
-
   void set_level(int level);
 
   int ALWAYS_INLINE level() const { return level_; }
@@ -398,9 +392,18 @@ class HashTableCtx {
   ///  - initial_seed: initial seed value to use when computing hashes for 
rows with
   ///        level 0. Other levels have their seeds derived from this seed.
   ///  - max_levels: the max lhashevels we will hash with.
-  ///  - mem_pool: the MemPool which the expression evaluators allocate from. 
Owned by the
-  ///        exec node which owns this hash table context. Memory usage of the 
expression
-  ///        value cache is charged against its MemTracker.
+  ///  - expr_perm_pool: the MemPool from which the expression evaluators make 
permanent
+  ///        allocations that live until Close(). Owned by the exec node which 
owns this
+  ///        hash table context. Memory usage of the expression value cache is 
charged
+  ///        against this MemPool's tracker.
+  ///  - build_expr_results_pool: the MemPool from which the expression 
evaluators make
+  ///        allocations to hold expression results. Cached build expression 
values may
+  ///        reference memory in this pool. Owned by the exec node which owns 
this hash
+  ///        table context.
+  ///  - probe_expr_results_pool: the MemPool from which the expression 
evaluators make
+  ///        allocations to hold expression results. Cached probe expression 
values may
+  ///        reference memory in this pool. Owned by the exec node which owns 
this hash
+  ///        table context.
   ///
   /// TODO: stores_nulls is too coarse: for a hash table in which some columns 
are joined
   ///       with '<=>' and others with '=', stores_nulls could distinguish 
between columns
@@ -409,7 +412,8 @@ class HashTableCtx {
   HashTableCtx(const std::vector<ScalarExpr*>& build_exprs,
       const std::vector<ScalarExpr*>& probe_exprs, bool stores_nulls,
       const std::vector<bool>& finds_nulls, int32_t initial_seed,
-      int max_levels, MemPool* mem_pool);
+      int max_levels, MemPool* expr_perm_pool, MemPool* 
build_expr_results_pool,
+      MemPool* probe_expr_results_pool);
 
   /// Allocate various buffers for storing expression evaluation results, hash 
values,
   /// null bits etc. Also allocate evaluators for the build and probe 
expressions and
@@ -513,9 +517,15 @@ class HashTableCtx {
   /// Scratch buffer to generate rows on the fly.
   TupleRow* scratch_row_;
 
-  /// MemPool for 'build_expr_evals_' and 'probe_expr_evals_' to allocate from.
-  /// Not owned.
-  MemPool* mem_pool_;
+  /// MemPool for 'build_expr_evals_' and 'probe_expr_evals_' to allocate 
expr-managed
+  /// memory from. Not owned.
+  MemPool* expr_perm_pool_;
+
+  /// MemPools for allocations by 'build_expr_evals_' and 'probe_expr_evals_' 
that hold
+  /// results of expr evaluation. Not owned. The owner of these pools is 
responsible for
+  /// clearing them when results from the respective expr evaluators are no 
longer needed.
+  MemPool* build_expr_results_pool_;
+  MemPool* probe_expr_results_pool_;
 };
 
 /// The hash table consists of a contiguous array of buckets that contain a 
pointer to the

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hbase-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc
index c957793..f393d8d 100644
--- a/be/src/exec/hbase-table-sink.cc
+++ b/be/src/exec/hbase-table-sink.cc
@@ -69,7 +69,7 @@ Status HBaseTableSink::Prepare(RuntimeState* state, 
MemTracker* parent_mem_track
 
 Status HBaseTableSink::Send(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(profile()->total_time_counter());
-  ScalarExprEvaluator::FreeLocalAllocations(output_expr_evals_);
+  expr_results_pool_->Clear();
   RETURN_IF_ERROR(state->CheckQueryState());
   // Since everything is set up just forward everything to the writer.
   RETURN_IF_ERROR(hbase_table_writer_->AppendRows(batch));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc 
b/be/src/exec/hdfs-parquet-scanner.cc
index 57f1e24..6cfaee0 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -217,8 +217,8 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 
   // Clone the min/max statistics conjuncts.
   RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, state_,
-      expr_mem_pool_.get(), scan_node_->min_max_conjunct_evals(),
-      &min_max_conjunct_evals_));
+      expr_perm_pool_.get(), context_->expr_results_pool(),
+      scan_node_->min_max_conjunct_evals(), &min_max_conjunct_evals_));
 
   for (int i = 0; i < context->filter_ctxs().size(); ++i) {
     const FilterContext* ctx = &context->filter_ctxs()[i];
@@ -605,8 +605,8 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
     }
   }
 
-  // Free any local allocations accumulated during conjunct evaluation.
-  ScalarExprEvaluator::FreeLocalAllocations(min_max_conjunct_evals_);
+  // Free any expr result allocations accumulated during conjunct evaluation.
+  context_->expr_results_pool()->Clear();
   return Status::OK();
 }
 
@@ -918,9 +918,9 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const 
parquet::RowGroup& row_gr
     bool column_has_match = false;
     for (int dict_idx = 0; dict_idx < dictionary->num_entries(); ++dict_idx) {
       if (dict_idx % 1024 == 0) {
-        // Don't let local allocations accumulate too much for large 
dictionaries or
+        // Don't let expr result allocations accumulate too much for large 
dictionaries or
         // many row groups.
-        ScalarExprEvaluator::FreeLocalAllocations(dict_filter_conjunct_evals);
+        context_->expr_results_pool()->Clear();
       }
       dictionary->GetValue(dict_idx, slot);
 
@@ -934,8 +934,8 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const 
parquet::RowGroup& row_gr
         break;
       }
     }
-    // Free all local allocations now that we're done with the filter.
-    ScalarExprEvaluator::FreeLocalAllocations(dict_filter_conjunct_evals);
+    // Free all expr result allocations now that we're done with the filter.
+    context_->expr_results_pool()->Clear();
 
     if (!column_has_match) {
       // The column contains no value that matches the conjunct. The row group
@@ -1042,20 +1042,12 @@ Status HdfsParquetScanner::CommitRows(RowBatch* 
dst_batch, int num_rows) {
   // Store UDF error in thread local storage or make UDF return status so it 
can merge
   // with parse_status_.
   RETURN_IF_ERROR(state_->GetQueryStatus());
-  // Free local expr allocations made when evaluating conjuncts for this batch.
-  FreeLocalAllocationsForConjuncts();
+  // Clear expr result allocations for this thread to avoid accumulating too 
much
+  // memory from evaluating the scanner conjuncts.
+  context_->expr_results_pool()->Clear();
   return Status::OK();
 }
 
-void HdfsParquetScanner::FreeLocalAllocationsForConjuncts() {
-  for (const auto& kv: conjunct_evals_map_) {
-    ScalarExprEvaluator::FreeLocalAllocations(kv.second);
-  }
-  for (const FilterContext* filter_ctx : filter_ctxs_) {
-    filter_ctx->expr_eval->FreeLocalAllocations();
-  }
-}
-
 int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
   // This function must not be called when the output batch is already full. 
As long as
   // we always call CommitRows() after TransferScratchTuples(), the output 
batch can

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h 
b/be/src/exec/hdfs-parquet-scanner.h
index 754737d..5f67036 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -650,11 +650,6 @@ class HdfsParquetScanner : public HdfsScanner {
   /// no values that pass the relevant conjuncts, then the row group can be 
skipped.
   Status EvalDictionaryFilters(const parquet::RowGroup& row_group,
       bool* skip_row_group) WARN_UNUSED_RESULT;
-
-  /// Free local allocations made when evaluating conjuncts over each row. 
Does not free
-  /// local allocations made when evaluated conjuncts for row groups, pages, 
etc. Those
-  /// should be freed separately after they are evaluated.
-  void FreeLocalAllocationsForConjuncts();
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index 5582ead..59d4ca2 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -142,14 +142,14 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
     } else {
       DCHECK(conjunct_evals_map_[entry.first].empty());
       RETURN_IF_ERROR(ScalarExprEvaluator::Create(entry.second, state, pool_,
-          expr_mem_pool(), &conjunct_evals_map_[entry.first]));
+          expr_perm_pool(), expr_results_pool(), 
&conjunct_evals_map_[entry.first]));
     }
   }
 
   // Prepare min max statistics conjuncts.
   if (min_max_tuple_id_ != -1) {
     RETURN_IF_ERROR(ScalarExprEvaluator::Create(min_max_conjuncts_, state, 
pool_,
-        expr_mem_pool(), &min_max_conjunct_evals_));
+        expr_perm_pool(), expr_results_pool(), &min_max_conjunct_evals_));
   }
 
   // One-time initialization of state that is constant across scan ranges

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index 53aa026..8d4efec 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -88,7 +88,8 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* 
row_batch, bool* e
     int64_t partition_id = metadata->partition_id;
     HdfsPartitionDescriptor* partition = 
hdfs_table_->GetPartition(partition_id);
     scanner_ctx_.reset(new ScannerContext(
-        runtime_state_, this, partition, scan_range_, filter_ctxs()));
+        runtime_state_, this, partition, scan_range_, filter_ctxs(),
+        expr_results_pool()));
     Status status = CreateAndOpenScanner(partition, scanner_ctx_.get(), 
&scanner_);
     if (!status.ok()) {
       DCHECK(scanner_ == NULL);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 64eece3..6aeb6a9 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -386,16 +386,20 @@ void HdfsScanNode::ScannerThread() {
   // contexts as the embedded expression evaluators may allocate from it and 
MemPool
   // is not thread safe.
   MemPool filter_mem_pool(expr_mem_tracker());
+  MemPool expr_results_pool(expr_mem_tracker());
   vector<FilterContext> filter_ctxs;
   Status filter_status = Status::OK();
   for (auto& filter_ctx: filter_ctxs_) {
     FilterContext filter;
-    filter_status = filter.CloneFrom(filter_ctx, pool_, runtime_state_, 
&filter_mem_pool);
+    filter_status = filter.CloneFrom(filter_ctx, pool_, runtime_state_, 
&filter_mem_pool,
+        &expr_results_pool);
     if (!filter_status.ok()) break;
     filter_ctxs.push_back(filter);
   }
 
   while (!done_) {
+    // Prevent memory accumulating across scan ranges.
+    expr_results_pool.Clear();
     {
       // Check if we have enough resources (thread token and memory) to keep 
using
       // this thread.
@@ -435,7 +439,7 @@ void HdfsScanNode::ScannerThread() {
     if (status.ok() && scan_range != NULL) {
       // Got a scan range. Process the range end to end (in this thread).
       status = ProcessSplit(filter_status.ok() ? filter_ctxs : 
vector<FilterContext>(),
-          scan_range);
+          &expr_results_pool, scan_range);
     }
 
     if (!status.ok()) {
@@ -477,6 +481,7 @@ exit:
   runtime_state_->resource_pool()->ReleaseThreadToken(false);
   for (auto& ctx: filter_ctxs) ctx.expr_eval->Close(runtime_state_);
   filter_mem_pool.FreeAll();
+  expr_results_pool.FreeAll();
 }
 
 namespace {
@@ -492,8 +497,7 @@ bool FileFormatIsSequenceBased(THdfsFileFormat::type 
format) {
 }
 
 Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
-    DiskIoMgr::ScanRange* scan_range) {
-
+    MemPool* expr_results_pool, DiskIoMgr::ScanRange* scan_range) {
   DCHECK(scan_range != NULL);
 
   ScanRangeMetadata* metadata = 
static_cast<ScanRangeMetadata*>(scan_range->meta_data());
@@ -519,7 +523,8 @@ Status HdfsScanNode::ProcessSplit(const 
vector<FilterContext>& filter_ctxs,
     }
   }
 
-  ScannerContext context(runtime_state_, this, partition, scan_range, 
filter_ctxs);
+  ScannerContext context(
+      runtime_state_, this, partition, scan_range, filter_ctxs, 
expr_results_pool);
   scoped_ptr<HdfsScanner> scanner;
   Status status = CreateAndOpenScanner(partition, &context, &scanner);
   if (!status.ok()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 782f530..30435c2 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -166,7 +166,7 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to 
filter rows
   /// in this split.
   Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
-      DiskIoMgr::ScanRange* scan_range) WARN_UNUSED_RESULT;
+      MemPool* expr_results_pool, DiskIoMgr::ScanRange* scan_range) 
WARN_UNUSED_RESULT;
 
   /// Returns true if there is enough memory (against the mem tracker limits) 
to
   /// have a scanner thread.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index e81a72a..e0f3016 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -60,7 +60,7 @@ const char* HdfsScanner::LLVM_CLASS_NAME = 
"class.impala::HdfsScanner";
 HdfsScanner::HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
     : scan_node_(scan_node),
       state_(state),
-      expr_mem_pool_(new MemPool(scan_node->expr_mem_tracker())),
+      expr_perm_pool_(new MemPool(scan_node->expr_mem_tracker())),
       template_tuple_pool_(new MemPool(scan_node->mem_tracker())),
       tuple_byte_size_(scan_node->tuple_desc()->byte_size()),
       data_buffer_pool_(new MemPool(scan_node->mem_tracker())) {
@@ -84,7 +84,7 @@ Status HdfsScanner::Open(ScannerContext* context) {
   // caller.
   for (const auto& entry: scan_node_->conjuncts_map()) {
     RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, 
scan_node_->runtime_state(),
-        expr_mem_pool_.get(), entry.second,
+        expr_perm_pool_.get(), context_->expr_results_pool(), entry.second,
         &conjunct_evals_map_[entry.first]));
   }
   DCHECK(conjunct_evals_map_.find(scan_node_->tuple_desc()->id()) !=
@@ -145,7 +145,8 @@ void HdfsScanner::CloseInternal() {
   for (auto& entry : conjunct_evals_map_) {
     ScalarExprEvaluator::Close(entry.second, state_);
   }
-  expr_mem_pool_->FreeAll();
+  expr_perm_pool_->FreeAll();
+  context_->expr_results_pool()->FreeAll();
   obj_pool_.Clear();
   stream_ = nullptr;
   context_->ClearStreams();
@@ -199,11 +200,9 @@ Status HdfsScanner::CommitRows(int num_rows, RowBatch* 
row_batch) {
   if (context_->cancelled()) return Status::CANCELLED;
   // Check for UDF errors.
   RETURN_IF_ERROR(state_->GetQueryStatus());
-  // Free local expr allocations for this thread to avoid accumulating too much
+  // Clear expr result allocations for this thread to avoid accumulating too 
much
   // memory from evaluating the scanner conjuncts.
-  for (const auto& entry: conjunct_evals_map_) {
-    ScalarExprEvaluator::FreeLocalAllocations(entry.second);
-  }
+  context_->expr_results_pool()->Clear();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index f2f3407..2005870 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -205,9 +205,9 @@ class HdfsScanner {
   /// Starts as false and is set to true in Close().
   bool is_closed_ = false;
 
-  /// MemPool used for expression evaluators in this scanner. Need to be local
-  /// to each scanner as MemPool is not thread safe.
-  boost::scoped_ptr<MemPool> expr_mem_pool_;
+  /// MemPool used for expr-managed memory in expression evaluators in this 
scanner.
+  /// Need to be local to each scanner as MemPool is not thread safe.
+  boost::scoped_ptr<MemPool> expr_perm_pool_;
 
   /// Clones of the conjuncts' evaluators in scan_node_->conjuncts_map().
   /// Each scanner has its own ScalarExprEvaluators so the conjuncts can be 
safely
@@ -311,7 +311,7 @@ class HdfsScanner {
 
   /// Commits 'num_rows' to 'row_batch'. Advances 'tuple_mem_' and 'tuple_' 
accordingly.
   /// Attaches completed resources from 'context_' to 'row_batch' if necessary.
-  /// Frees local expr allocations. Returns non-OK if 'context_' is cancelled 
or the
+  /// Frees expr result allocations. Returns non-OK if 'context_' is cancelled 
or the
   /// query status in 'state_' is non-OK.
   Status CommitRows(int num_rows, RowBatch* row_batch) WARN_UNUSED_RESULT;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index e488717..e96ceb7 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -94,7 +94,8 @@ Status HdfsTableSink::Prepare(RuntimeState* state, 
MemTracker* parent_mem_tracke
   unique_id_str_ = PrintId(state->fragment_instance_id(), "-");
   SCOPED_TIMER(profile()->total_time_counter());
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_key_exprs_, state,
-      state->obj_pool(), expr_mem_pool(), &partition_key_expr_evals_));
+      state->obj_pool(), expr_perm_pool_.get(), expr_results_pool_.get(),
+      &partition_key_expr_evals_));
 
   // TODO: Consider a system-wide random number generator, initialised in a 
single place.
   ptime now = microsec_clock::local_time();
@@ -599,8 +600,7 @@ inline Status 
HdfsTableSink::GetOutputPartition(RuntimeState* state, const Tuple
 
 Status HdfsTableSink::Send(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(profile()->total_time_counter());
-  ScalarExprEvaluator::FreeLocalAllocations(output_expr_evals_);
-  ScalarExprEvaluator::FreeLocalAllocations(partition_key_expr_evals_);
+  expr_results_pool_->Clear();
   RETURN_IF_ERROR(state->CheckQueryState());
   // We don't do any work for an empty batch.
   if (batch->num_rows() == 0) return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index 9c9ce73..deb9c84 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -63,7 +63,8 @@ const string MODE_READ_AT_SNAPSHOT = "READ_AT_SNAPSHOT";
 KuduScanner::KuduScanner(KuduScanNodeBase* scan_node, RuntimeState* state)
   : scan_node_(scan_node),
     state_(state),
-    expr_mem_pool_(new MemPool(scan_node->expr_mem_tracker())),
+    expr_perm_pool_(new MemPool(scan_node->expr_mem_tracker())),
+    expr_results_pool_(new MemPool(scan_node->expr_mem_tracker())),
     cur_kudu_batch_num_read_(0),
     last_alive_time_micros_(0) {
 }
@@ -74,8 +75,8 @@ Status KuduScanner::Open() {
     if (slot->type().type != TYPE_TIMESTAMP) continue;
     timestamp_slots_.push_back(slot);
   }
-  return ScalarExprEvaluator::Clone(&obj_pool_, state_, expr_mem_pool_.get(),
-      scan_node_->conjunct_evals(), &conjunct_evals_);
+  return ScalarExprEvaluator::Clone(&obj_pool_, state_, expr_perm_pool_.get(),
+      expr_results_pool_.get(), scan_node_->conjunct_evals(), 
&conjunct_evals_);
 }
 
 void KuduScanner::KeepKuduScannerAlive() {
@@ -131,7 +132,8 @@ Status KuduScanner::GetNext(RowBatch* row_batch, bool* eos) 
{
 void KuduScanner::Close() {
   if (scanner_) CloseCurrentClientScanner();
   ScalarExprEvaluator::Close(conjunct_evals_, state_);
-  expr_mem_pool_->FreeAll();
+  expr_perm_pool_->FreeAll();
+  expr_results_pool_->FreeAll();
 }
 
 Status KuduScanner::OpenNextScanToken(const string& scan_token)  {
@@ -245,7 +247,7 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* 
row_batch, Tuple** tuple_me
     // Move to the next tuple in the tuple buffer.
     *tuple_mem = next_tuple(*tuple_mem);
   }
-  ScalarExprEvaluator::FreeLocalAllocations(conjunct_evals_);
+  expr_results_pool_->Clear();
 
   // Check the status in case an error status was set during conjunct 
evaluation.
   return state_->GetQueryStatus();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/kudu-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.h b/be/src/exec/kudu-scanner.h
index e327e7a..125881c 100644
--- a/be/src/exec/kudu-scanner.h
+++ b/be/src/exec/kudu-scanner.h
@@ -90,9 +90,14 @@ class KuduScanner {
   /// For objects which have the same life time as the scanner.
   ObjectPool obj_pool_;
 
-  /// MemPool used for expression evaluators in this scanner. Need to be local
-  /// to each scanner as MemPool is not thread safe.
-  boost::scoped_ptr<MemPool> expr_mem_pool_;
+  /// MemPool used for expr-managed allocations in expression evaluators in 
this scanner.
+  /// Need to be local to each scanner as MemPool is not thread safe.
+  boost::scoped_ptr<MemPool> expr_perm_pool_;
+
+  /// MemPool used for allocations by expression evaluators in this scanner 
that hold
+  /// results of expression evaluation. Need to be local to each scanner as 
MemPool is
+  /// not thread safe.
+  boost::scoped_ptr<MemPool> expr_results_pool_;
 
   /// The kudu::client::KuduScanner for the current scan token. A new 
KuduScanner is
   /// created for each scan token using 
KuduScanToken::DeserializeIntoScanner().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index f01a373..c5abf7b 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -213,7 +213,7 @@ kudu::client::KuduWriteOperation* 
KuduTableSink::NewWriteOp() {
 
 Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(profile()->total_time_counter());
-  ScalarExprEvaluator::FreeLocalAllocations(output_expr_evals_);
+  expr_results_pool_->Clear();
   RETURN_IF_ERROR(state->CheckQueryState());
   const KuduSchema& table_schema = table_->schema();
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/nested-loop-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.cc 
b/be/src/exec/nested-loop-join-node.cc
index e80d230..c5ecf53 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -66,7 +66,7 @@ Status NestedLoopJoinNode::Open(RuntimeState* state) {
   RETURN_IF_ERROR(BlockingJoinNode::Open(state));
   RETURN_IF_ERROR(ScalarExprEvaluator::Open(join_conjunct_evals_, state));
 
-  // Check for errors and free local allocations before opening children.
+  // Check for errors and free expr result allocations before opening children.
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
 
@@ -99,7 +99,7 @@ Status NestedLoopJoinNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(join_conjuncts_, state,
-      pool_, expr_mem_pool(), &join_conjunct_evals_));
+      pool_, expr_perm_pool(), expr_results_pool(), &join_conjunct_evals_));
   builder_.reset(new NljBuilder(child(1)->row_desc(), state));
   RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
   runtime_profile()->PrependChild(builder_->profile());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/partial-sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
index 107c29f..68d98a4 100644
--- a/be/src/exec/partial-sort-node.cc
+++ b/be/src/exec/partial-sort-node.cc
@@ -57,11 +57,10 @@ Status PartialSortNode::Init(const TPlanNode& tnode, 
RuntimeState* state) {
 Status PartialSortNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
-  less_than_.reset(new TupleRowComparator(ordering_exprs_, is_asc_order_, 
nulls_first_));
-  sorter_.reset(new Sorter(*less_than_, sort_tuple_exprs_, &row_descriptor_,
-      mem_tracker(), &buffer_pool_client_, 
resource_profile_.spillable_buffer_size,
-      runtime_profile(), state, id(), false));
-  RETURN_IF_ERROR(sorter_->Prepare(pool_, expr_mem_pool()));
+  sorter_.reset(new Sorter(ordering_exprs_, is_asc_order_, nulls_first_,
+      sort_tuple_exprs_, &row_descriptor_, mem_tracker(), &buffer_pool_client_,
+      resource_profile_.spillable_buffer_size, runtime_profile(), state, id(), 
false));
+  RETURN_IF_ERROR(sorter_->Prepare(pool_));
   DCHECK_GE(resource_profile_.min_reservation, 
sorter_->ComputeMinReservation());
   AddCodegenDisabledMessage(state);
   input_batch_.reset(
@@ -73,14 +72,13 @@ void PartialSortNode::Codegen(RuntimeState* state) {
   DCHECK(state->ShouldCodegen());
   ExecNode::Codegen(state);
   if (IsNodeCodegenDisabled()) return;
-  Status codegen_status = less_than_->Codegen(state);
+  Status codegen_status = sorter_->Codegen(state);
   runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
 }
 
 Status PartialSortNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Open(state));
-  RETURN_IF_ERROR(less_than_->Open(pool_, state, expr_mem_pool()));
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
   RETURN_IF_ERROR(child(0)->Open(state));
@@ -151,7 +149,6 @@ Status PartialSortNode::Reset(RuntimeState* state) {
 void PartialSortNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   child(0)->Close(state);
-  if (less_than_.get() != nullptr) less_than_->Close(state);
   if (sorter_ != nullptr) sorter_->Close(state);
   sorter_.reset();
   ScalarExpr::Close(ordering_exprs_);
@@ -160,11 +157,6 @@ void PartialSortNode::Close(RuntimeState* state) {
   ExecNode::Close(state);
 }
 
-Status PartialSortNode::QueryMaintenance(RuntimeState* state) {
-  sorter_->FreeLocalAllocations();
-  return ExecNode::QueryMaintenance(state);
-}
-
 void PartialSortNode::DebugString(int indentation_level, stringstream* out) 
const {
   *out << string(indentation_level * 2, ' ');
   *out << "PartialSortNode(" << ScalarExpr::DebugString(ordering_exprs_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/partial-sort-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partial-sort-node.h b/be/src/exec/partial-sort-node.h
index d40d653..421df31 100644
--- a/be/src/exec/partial-sort-node.h
+++ b/be/src/exec/partial-sort-node.h
@@ -54,13 +54,9 @@ class PartialSortNode : public ExecNode {
   virtual void Close(RuntimeState* state);
 
  protected:
-  virtual Status QueryMaintenance(RuntimeState* state);
   virtual void DebugString(int indentation_level, std::stringstream* out) 
const;
 
  private:
-  /// Compares tuples according to 'ordering_exprs'.
-  boost::scoped_ptr<TupleRowComparator> less_than_;
-
   /// Expressions and parameters used for tuple comparison.
   std::vector<ScalarExpr*> ordering_exprs_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc 
b/be/src/exec/partitioned-aggregation-node.cc
index 3824a72..4eaebb6 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -186,8 +186,7 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* 
state) {
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   state_ = state;
 
-  mem_pool_.reset(new MemPool(mem_tracker()));
-  agg_fn_pool_.reset(new MemPool(expr_mem_tracker()));
+  singleton_tuple_pool_.reset(new MemPool(mem_tracker()));
 
   ht_resize_timer_ = ADD_TIMER(runtime_profile(), "HTResizeTime");
   get_results_timer_ = ADD_TIMER(runtime_profile(), "GetResultsTime");
@@ -218,13 +217,14 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* 
state) {
         "MaxPartitionLevel", TUnit::UNIT);
   }
 
-  RETURN_IF_ERROR(AggFnEvaluator::Create(agg_fns_, state, pool_, 
agg_fn_pool_.get(),
-      &agg_fn_evals_));
+  RETURN_IF_ERROR(AggFnEvaluator::Create(agg_fns_, state, pool_, 
expr_perm_pool(),
+      expr_results_pool(), &agg_fn_evals_));
 
   if (!grouping_exprs_.empty()) {
     RETURN_IF_ERROR(HashTableCtx::Create(pool_, state, build_exprs_,
         grouping_exprs_, true, vector<bool>(build_exprs_.size(), true),
-        state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1, expr_mem_pool(), 
&ht_ctx_));
+        state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1, expr_perm_pool(),
+        expr_results_pool(), expr_results_pool(), &ht_ctx_));
   }
   AddCodegenDisabledMessage(state);
   return Status::OK();
@@ -263,7 +263,7 @@ Status PartitionedAggregationNode::Open(RuntimeState* 
state) {
     // Create the single output tuple for this non-grouping agg. This must 
happen after
     // opening the aggregate evaluators.
     singleton_output_tuple_ =
-        ConstructSingletonOutputTuple(agg_fn_evals_, mem_pool_.get());
+        ConstructSingletonOutputTuple(agg_fn_evals_, 
singleton_tuple_pool_.get());
     // Check for failures during AggFnEvaluator::Init().
     RETURN_IF_ERROR(state_->GetQueryStatus());
     singleton_output_tuple_returned_ = false;
@@ -343,62 +343,8 @@ Status PartitionedAggregationNode::Open(RuntimeState* 
state) {
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::GetNext(RuntimeState* state, RowBatch* 
row_batch,
-    bool* eos) {
-  int first_row_idx = row_batch->num_rows();
-  RETURN_IF_ERROR(GetNextInternal(state, row_batch, eos));
-  RETURN_IF_ERROR(HandleOutputStrings(row_batch, first_row_idx));
-  return Status::OK();
-}
-
-Status PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch,
-    int first_row_idx) {
-  if (!needs_finalize_ && !needs_serialize_) return Status::OK();
-  // String data returned by Serialize() or Finalize() is from local expr 
allocations in
-  // the agg function contexts, and will be freed on the next GetNext() call by
-  // FreeLocalAllocations(). The data either needs to be copied out now or 
sent up the
-  // plan and copied out by a blocking ancestor. (See IMPALA-3311)
-  for (const AggFn* agg_fn : agg_fns_) {
-    const SlotDescriptor& slot_desc = agg_fn->output_slot_desc();
-    DCHECK(!slot_desc.type().IsCollectionType()) << "producing collections 
NYI";
-    if (!slot_desc.type().IsVarLenStringType()) continue;
-    if (IsInSubplan()) {
-      // Copy string data to the row batch's pool. This is more efficient than
-      // MarkNeedsDeepCopy() in a subplan since we are likely producing many 
small
-      // batches.
-      RETURN_IF_ERROR(CopyStringData(slot_desc, row_batch,
-          first_row_idx, row_batch->tuple_data_pool()));
-    } else {
-      row_batch->MarkNeedsDeepCopy();
-      break;
-    }
-  }
-  return Status::OK();
-}
-
-Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor& 
slot_desc,
-    RowBatch* row_batch, int first_row_idx, MemPool* pool) {
-  DCHECK(slot_desc.type().IsVarLenStringType());
-  DCHECK_EQ(row_batch->row_desc()->tuple_descriptors().size(), 1);
-  FOREACH_ROW(row_batch, first_row_idx, batch_iter) {
-    Tuple* tuple = batch_iter.Get()->GetTuple(0);
-    StringValue* sv = reinterpret_cast<StringValue*>(
-        tuple->GetSlot(slot_desc.tuple_offset()));
-    if (sv == NULL || sv->len == 0) continue;
-    char* new_ptr = reinterpret_cast<char*>(pool->TryAllocate(sv->len));
-    if (UNLIKELY(new_ptr == NULL)) {
-      string details = Substitute("Cannot perform aggregation at node with id 
$0."
-          " Failed to allocate $1 output bytes.", id_, sv->len);
-      return pool->mem_tracker()->MemLimitExceeded(state_, details, sv->len);
-    }
-    memcpy(new_ptr, sv->ptr, sv->len);
-    sv->ptr = new_ptr;
-  }
-  return Status::OK();
-}
-
-Status PartitionedAggregationNode::GetNextInternal(RuntimeState* state,
-    RowBatch* row_batch, bool* eos) {
+Status PartitionedAggregationNode::GetNext(
+    RuntimeState* state, RowBatch* row_batch, bool* eos) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);
@@ -435,6 +381,11 @@ void 
PartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) {
   DCHECK(grouping_exprs_.empty());
   int row_idx = row_batch->AddRow();
   TupleRow* row = row_batch->GetRow(row_idx);
+  // The output row batch may reference memory allocated by Serialize() or 
Finalize(),
+  // allocating that memory directly from the row batch's pool means we can 
safely return
+  // the batch.
+  vector<ScopedResultsPool> allocate_from_batch_pool =
+      ScopedResultsPool::Create(agg_fn_evals_, row_batch->tuple_data_pool());
   Tuple* output_tuple = GetOutputTuple(agg_fn_evals_,
       singleton_output_tuple_, row_batch->tuple_data_pool());
   row->SetTuple(0, output_tuple);
@@ -446,7 +397,7 @@ void 
PartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) {
   }
   // Keep the current chunk to amortize the memory allocation over a series
   // of Reset()/Open()/GetNext()* calls.
-  row_batch->tuple_data_pool()->AcquireData(mem_pool_.get(), true);
+  row_batch->tuple_data_pool()->AcquireData(singleton_tuple_pool_.get(), true);
   // This node no longer owns the memory for singleton_output_tuple_.
   singleton_output_tuple_ = NULL;
 }
@@ -471,10 +422,16 @@ Status 
PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
   }
 
   SCOPED_TIMER(get_results_timer_);
+
+  // The output row batch may reference memory allocated by Serialize() or 
Finalize(),
+  // allocating that memory directly from the row batch's pool means we can 
safely return
+  // the batch.
+  vector<ScopedResultsPool> allocate_from_batch_pool = 
ScopedResultsPool::Create(
+        output_partition_->agg_fn_evals, row_batch->tuple_data_pool());
   int count = 0;
   const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
   // Keeping returning rows from the current partition.
-  while (!output_iterator_.AtEnd()) {
+  while (!output_iterator_.AtEnd() && !row_batch->AtCapacity()) {
     // This loop can go on for a long time if the conjuncts are very 
selective. Do query
     // maintenance every N iterations.
     if ((count++ & (N - 1)) == 0) {
@@ -493,9 +450,7 @@ Status 
PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
     if (ExecNode::EvalConjuncts(conjunct_evals_.data(), conjuncts_.size(), 
row)) {
       row_batch->CommitLastRow();
       ++num_rows_returned_;
-      if (ReachedLimit() || row_batch->AtCapacity()) {
-        break;
-      }
+      if (ReachedLimit()) break;
     }
   }
 
@@ -634,17 +589,22 @@ void PartitionedAggregationNode::CleanupHashTbl(
     // Finalize() requires a dst tuple but we don't actually need the result,
     // so allocate a single dummy tuple to avoid accumulating memory.
     Tuple* dummy_dst = NULL;
-    dummy_dst = Tuple::Create(output_tuple_desc_->byte_size(), 
mem_pool_.get());
+    dummy_dst = Tuple::Create(
+        output_tuple_desc_->byte_size(), singleton_tuple_pool_.get());
     while (!it.AtEnd()) {
       Tuple* tuple = it.GetTuple();
       AggFnEvaluator::Finalize(agg_fn_evals, tuple, dummy_dst);
       it.Next();
+      // Free any expr result allocations to prevent them accumulating 
excessively.
+      expr_results_pool_->Clear();
     }
   } else {
     while (!it.AtEnd()) {
       Tuple* tuple = it.GetTuple();
       AggFnEvaluator::Serialize(agg_fn_evals, tuple);
       it.Next();
+      // Free any expr result allocations to prevent them accumulating 
excessively.
+      expr_results_pool_->Clear();
     }
   }
 }
@@ -665,7 +625,7 @@ void PartitionedAggregationNode::Close(RuntimeState* state) 
{
   if (is_closed()) return;
 
   if (!singleton_output_tuple_returned_) {
-    GetOutputTuple(agg_fn_evals_, singleton_output_tuple_, mem_pool_.get());
+    GetOutputTuple(agg_fn_evals_, singleton_output_tuple_, 
singleton_tuple_pool_.get());
   }
 
   // Iterate through the remaining rows in the hash table and call 
Serialize/Finalize on
@@ -682,8 +642,7 @@ void PartitionedAggregationNode::Close(RuntimeState* state) 
{
   // Close all the agg-fn-evaluators
   AggFnEvaluator::Close(agg_fn_evals_, state);
 
-  if (agg_fn_pool_.get() != nullptr) agg_fn_pool_->FreeAll();
-  if (mem_pool_.get() != nullptr) mem_pool_->FreeAll();
+  if (singleton_tuple_pool_.get() != nullptr) singleton_tuple_pool_->FreeAll();
   if (ht_ctx_.get() != nullptr) ht_ctx_->Close(state);
   ht_ctx_.reset();
   if (serialize_stream_.get() != nullptr) {
@@ -700,10 +659,10 @@ PartitionedAggregationNode::Partition::~Partition() {
 }
 
 Status PartitionedAggregationNode::Partition::InitStreams() {
-  agg_fn_pool.reset(new MemPool(parent->expr_mem_tracker()));
+  agg_fn_perm_pool.reset(new MemPool(parent->expr_mem_tracker()));
   DCHECK_EQ(agg_fn_evals.size(), 0);
-  AggFnEvaluator::ShallowClone(parent->partition_pool_.get(), 
agg_fn_pool.get(),
-      parent->agg_fn_evals_, &agg_fn_evals);
+  AggFnEvaluator::ShallowClone(parent->partition_pool_.get(), 
agg_fn_perm_pool.get(),
+      parent->expr_results_pool(), parent->agg_fn_evals_, &agg_fn_evals);
   // Varlen aggregate function results are stored outside of 
aggregated_row_stream because
   // BufferedTupleStream doesn't support relocating varlen data stored in the 
stream.
   auto agg_slot = parent->intermediate_tuple_desc_->slots().begin() +
@@ -829,9 +788,9 @@ Status PartitionedAggregationNode::Partition::Spill(bool 
more_aggregate_rows) {
   AggFnEvaluator::Close(agg_fn_evals, parent->state_);
   agg_fn_evals.clear();
 
-  if (agg_fn_pool.get() != NULL) {
-    agg_fn_pool->FreeAll();
-    agg_fn_pool.reset();
+  if (agg_fn_perm_pool.get() != nullptr) {
+    agg_fn_perm_pool->FreeAll();
+    agg_fn_perm_pool.reset();
   }
 
   hash_tbl->Close();
@@ -875,7 +834,7 @@ void PartitionedAggregationNode::Partition::Close(bool 
finalize_rows) {
     unaggregated_row_stream->Close(NULL, 
RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   }
   for (AggFnEvaluator* eval : agg_fn_evals) eval->Close(parent->state_);
-  if (agg_fn_pool.get() != NULL) agg_fn_pool->FreeAll();
+  if (agg_fn_perm_pool.get() != nullptr) agg_fn_perm_pool->FreeAll();
 }
 
 Tuple* PartitionedAggregationNode::ConstructSingletonOutputTuple(
@@ -1346,7 +1305,7 @@ Status PartitionedAggregationNode::SpillPartition(bool 
more_aggregate_rows) {
     // Pass 'true' because we need to keep the write block pinned. See 
Partition::Spill().
     int64_t mem = 
hash_partitions_[i]->aggregated_row_stream->BytesPinned(true);
     mem += hash_partitions_[i]->hash_tbl->ByteSize();
-    mem += hash_partitions_[i]->agg_fn_pool->total_reserved_bytes();
+    mem += hash_partitions_[i]->agg_fn_perm_pool->total_reserved_bytes();
     DCHECK_GT(mem, 0); // At least the hash table buckets should occupy memory.
     if (mem > max_freed_mem) {
       max_freed_mem = mem;
@@ -1432,17 +1391,6 @@ void PartitionedAggregationNode::ClosePartitions() {
   partition_pool_->Clear();
 }
 
-Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) {
-  AggFnEvaluator::FreeLocalAllocations(agg_fn_evals_);
-  for (Partition* partition : hash_partitions_) {
-    if (partition != nullptr) {
-      AggFnEvaluator::FreeLocalAllocations(partition->agg_fn_evals);
-    }
-  }
-  if (ht_ctx_.get() != nullptr) ht_ctx_->FreeLocalAllocations();
-  return ExecNode::QueryMaintenance(state);
-}
-
 // IR Generation for updating a single aggregation slot. Signature is:
 // void UpdateSlot(AggFnEvaluator* agg_expr_eval, AggTuple* agg_tuple, char** 
row)
 //

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h 
b/be/src/exec/partitioned-aggregation-node.h
index 72354cc..c522c1b 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -144,8 +144,6 @@ class PartitionedAggregationNode : public ExecNode {
   static const char* LLVM_CLASS_NAME;
 
  protected:
-  /// Frees local allocations from aggregate_evals_ and agg_fn_evals
-  virtual Status QueryMaintenance(RuntimeState* state);
   virtual std::string DebugString(int indentation_level) const;
   virtual void DebugString(int indentation_level, std::stringstream* out) 
const;
 
@@ -213,13 +211,14 @@ class PartitionedAggregationNode : public ExecNode {
   /// The list of all aggregate operations for this exec node.
   std::vector<AggFn*> agg_fns_;
 
-  /// Evaluators for each aggregate function and backing MemPool. String data
-  /// returned by the aggregate functions is allocated via these evaluators.
-  /// These evaluatorss are only used for the non-grouping cases. For queries
-  /// with the group-by clause, each partition will clone these evaluators.
-  /// TODO: we really need to plumb through CHAR(N) for intermediate types.
+  /// Evaluators for each aggregate function. If this is a grouping 
aggregation, these
+  /// evaluators are only used to create cloned per-partition evaluators. The 
cloned
+  /// evaluators are then used to evaluate the functions. If this is a 
non-grouping
+  /// aggregation these evaluators are used directly to evaluate the functions.
+  ///
+  /// Permanent and result allocations for these allocators are allocated from
+  /// 'expr_perm_pool_' and 'expr_results_pool_' respectively.
   std::vector<AggFnEvaluator*> agg_fn_evals_;
-  boost::scoped_ptr<MemPool> agg_fn_pool_;
 
   /// Exprs used to evaluate input rows
   std::vector<ScalarExpr*> grouping_exprs_;
@@ -243,7 +242,7 @@ class PartitionedAggregationNode : public ExecNode {
   /// For non-grouping aggregations, the ownership of the pool's memory is 
transferred
   /// to the output batch on eos. The pool should not be Reset() to allow 
amortizing
   /// memory allocation over a series of Reset()/Open()/GetNext()* calls.
-  boost::scoped_ptr<MemPool> mem_pool_;
+  boost::scoped_ptr<MemPool> singleton_tuple_pool_;
 
   /// The current partition and iterator to the next row in its hash table 
that we need
   /// to return in GetNext()
@@ -415,9 +414,15 @@ class PartitionedAggregationNode : public ExecNode {
     /// is spilled or we are passing through all rows for this partition).
     boost::scoped_ptr<HashTable> hash_tbl;
 
-    /// Clone of parent's agg_fn_evals_ and backing MemPool.
+    /// Clone of parent's agg_fn_evals_. Permanent allocations come from
+    /// 'agg_fn_perm_pool' and result allocations come from the ExecNode's
+    /// 'expr_results_pool_'.
     std::vector<AggFnEvaluator*> agg_fn_evals;
-    boost::scoped_ptr<MemPool> agg_fn_pool;
+
+    /// Pool for permanent allocations for this partition's 'agg_fn_evals'. 
Freed at the
+    /// same times as 'agg_fn_evals' are closed: either when the partition is 
closed or
+    /// when it is spilled.
+    boost::scoped_ptr<MemPool> agg_fn_perm_pool;
 
     /// Tuple stream used to store aggregated rows. When the partition is not 
spilled,
     /// (meaning the hash table is maintained), this stream is pinned and 
contains the
@@ -447,20 +452,6 @@ class PartitionedAggregationNode : public ExecNode {
     return ht;
   }
 
-  /// Materializes 'row_batch' in either grouping or non-grouping case.
-  Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos);
-
-  /// Helper function called by GetNextInternal() to ensure that string data 
referenced in
-  /// 'row_batch' will live as long as 'row_batch's tuples. 'first_row_idx' 
indexes the
-  /// first row that should be processed in 'row_batch'.
-  Status HandleOutputStrings(RowBatch* row_batch, int first_row_idx);
-
-  /// Copies string data from the specified slot into 'pool', and sets the 
StringValues'
-  /// ptrs to the copied data. Copies data from all tuples in 'row_batch' from
-  /// 'first_row_idx' onwards. 'slot_desc' must have a var-len string type.
-  Status CopyStringData(const SlotDescriptor& slot_desc, RowBatch* row_batch,
-      int first_row_idx, MemPool* pool);
-
   /// Constructs singleton output tuple, allocating memory from pool.
   Tuple* ConstructSingletonOutputTuple(
       const std::vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc 
b/be/src/exec/partitioned-hash-join-builder.cc
index 6b7b791..4b49fb0 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -129,24 +129,17 @@ string PhjBuilder::GetName() {
   return Substitute("Hash Join Builder (join_node_id=$0)", join_node_id_);
 }
 
-void PhjBuilder::FreeLocalAllocations() const {
-  if (ht_ctx_.get() != nullptr) ht_ctx_->FreeLocalAllocations();
-  for (const FilterContext& ctx : filter_ctxs_) {
-    if (ctx.expr_eval != nullptr) ctx.expr_eval->FreeLocalAllocations();
-  }
-}
-
 Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker) {
   RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
-  RETURN_IF_ERROR(HashTableCtx::Create(&pool_, state, build_exprs_, 
build_exprs_,
+  RETURN_IF_ERROR(HashTableCtx::Create(&obj_pool_, state, build_exprs_, 
build_exprs_,
       HashTableStoresNulls(), is_not_distinct_from_, 
state->fragment_hash_seed(),
-      MAX_PARTITION_DEPTH, row_desc_->tuple_descriptors().size(), 
expr_mem_pool(),
-      &ht_ctx_));
+      MAX_PARTITION_DEPTH, row_desc_->tuple_descriptors().size(), 
expr_perm_pool_.get(),
+      expr_results_pool_.get(), expr_results_pool_.get(), &ht_ctx_));
 
   DCHECK_EQ(filter_exprs_.size(), filter_ctxs_.size());
   for (int i = 0; i < filter_exprs_.size(); ++i) {
-    RETURN_IF_ERROR(ScalarExprEvaluator::Create(*filter_exprs_[i], state, 
&pool_,
-        expr_mem_pool(), &filter_ctxs_[i].expr_eval));
+    RETURN_IF_ERROR(ScalarExprEvaluator::Create(*filter_exprs_[i], state, 
&obj_pool_,
+        expr_perm_pool_.get(), expr_results_pool_.get(), 
&filter_ctxs_[i].expr_eval));
   }
 
   partitions_created_ = ADD_COUNTER(profile(), "PartitionsCreated", 
TUnit::UNIT);
@@ -210,8 +203,8 @@ Status PhjBuilder::Send(RuntimeState* state, RowBatch* 
batch) {
     }
   }
 
-  // Free any local allocations made during partitioning.
-  FreeLocalAllocations();
+  // Free any expr result allocations made during partitioning.
+  expr_results_pool_->Clear();
   COUNTER_ADD(num_build_rows_partitioned_, batch->num_rows());
   return Status::OK();
 }
@@ -266,7 +259,6 @@ Status PhjBuilder::FlushFinal(RuntimeState* state) {
 
 void PhjBuilder::Close(RuntimeState* state) {
   if (closed_) return;
-  FreeLocalAllocations();
   CloseAndDeletePartitions();
   if (ht_ctx_ != nullptr) ht_ctx_->Close(state);
   ht_ctx_.reset();
@@ -275,13 +267,13 @@ void PhjBuilder::Close(RuntimeState* state) {
   }
   ScalarExpr::Close(filter_exprs_);
   ScalarExpr::Close(build_exprs_);
-  pool_.Clear();
+  obj_pool_.Clear();
   DataSink::Close(state);
   closed_ = true;
 }
 
 void PhjBuilder::Reset() {
-  FreeLocalAllocations();
+  expr_results_pool_->Clear();
   non_empty_build_ = false;
   CloseAndDeletePartitions();
 }
@@ -712,8 +704,8 @@ Status PhjBuilder::Partition::BuildHashTable(bool* built) {
     }
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(state->GetQueryStatus());
-    // Free any local allocations made while inserting.
-    parent_->FreeLocalAllocations();
+    // Free any expr result allocations made while inserting.
+    parent_->expr_results_pool_->Clear();
     batch.Reset();
   } while (!eos);
 


Reply via email to