http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h 
b/be/src/exec/partitioned-hash-join-builder.h
index c123a9b..e8dd524 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -277,9 +277,6 @@ class PhjBuilder : public DataSink {
   virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
       const TDataSink& tsink, RuntimeState* state) override;
 
-  /// Free local allocations made from expr evaluators during hash table 
construction.
-  void FreeLocalAllocations() const;
-
  private:
   /// Create and initialize a set of hash partitions for partitioning level 
'level'.
   /// The previous hash partitions must have been cleared with 
ClearHashPartitions().
@@ -388,7 +385,7 @@ class PhjBuilder : public DataSink {
   const RowDescriptor* probe_row_desc_;
 
   /// Pool for objects with same lifetime as builder.
-  ObjectPool pool_;
+  ObjectPool obj_pool_;
 
   /// Client to the buffer pool, used to allocate build partition buffers and 
hash tables.
   /// When probing, the spilling algorithm keeps some build partitions in 
memory while

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc 
b/be/src/exec/partitioned-hash-join-node.cc
index 4faa12e..94b49b3 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -119,13 +119,22 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* 
state) {
   runtime_profile()->PrependChild(builder_->profile());
 
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(other_join_conjuncts_, state, 
pool_,
-      expr_mem_pool(), &other_join_conjunct_evals_));
-  AddEvaluatorsToFree(other_join_conjunct_evals_);
+      expr_perm_pool(), expr_results_pool(), &other_join_conjunct_evals_));
 
+  probe_expr_results_pool_.reset(new MemPool(mem_tracker()));
+
+  // We have to carefully set up expression evaluators in the HashTableCtx to 
use
+  // MemPools with appropriate lifetime.  The values of build exprs are only 
used
+  // temporarily while processing each build batch or when processing a probe 
row
+  // so can be stored in 'expr_results_pool_', which is freed during
+  // QueryMaintenance(). Values of probe exprs may need to live longer until 
the
+  // cache is reset so are stored in 'probe_expr_results_pool_', which is 
cleared
+  // manually at the appropriate time.
   RETURN_IF_ERROR(HashTableCtx::Create(pool_, state, build_exprs_, 
probe_exprs_,
       builder_->HashTableStoresNulls(), builder_->is_not_distinct_from(),
       state->fragment_hash_seed(), MAX_PARTITION_DEPTH,
-      child(1)->row_desc()->tuple_descriptors().size(), expr_mem_pool(), 
&ht_ctx_));
+      child(1)->row_desc()->tuple_descriptors().size(), expr_perm_pool(),
+      expr_results_pool(), probe_expr_results_pool_.get(), &ht_ctx_));
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
     null_aware_eval_timer_ = ADD_TIMER(runtime_profile(), 
"NullAwareAntiJoinEvalTime");
   }
@@ -163,14 +172,14 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) 
{
   RETURN_IF_ERROR(ht_ctx_->Open(state));
   RETURN_IF_ERROR(ScalarExprEvaluator::Open(other_join_conjunct_evals_, 
state));
 
-  // Check for errors and free local allocations before opening children.
+  // Check for errors and free expr result allocations before opening children.
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
-  // The prepare functions of probe expressions may have done local 
allocations implicitly
-  // (e.g. calling UdfBuiltins::Lower()). The probe expressions' local 
allocations need to
-  // be freed now as they don't get freed again till probing. Other exprs' 
local allocations
-  // are freed in ExecNode::FreeLocalAllocations().
-  ht_ctx_->FreeProbeLocalAllocations();
+  // The prepare functions of probe expressions may have made result 
allocations implicitly
+  // (e.g. calling UdfBuiltins::Lower()). The probe expressions' expr result 
allocations need to
+  // be cleared now as they don't get cleared again till probing. Other exprs' 
result allocations
+  // are cleared in QueryMaintenance().
+  probe_expr_results_pool_->Clear();
 
   RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, 
builder_.get()));
   RETURN_IF_ERROR(PrepareForProbe());
@@ -197,15 +206,6 @@ Status 
PartitionedHashJoinNode::AcquireResourcesForBuild(RuntimeState* state) {
   return Status::OK();
 }
 
-Status PartitionedHashJoinNode::QueryMaintenance(RuntimeState* state) {
-  // Build expressions may be evaluated during probing, so must be freed.
-  // Probe side expr is not included in QueryMaintenance(). We cache the probe 
expression
-  // values in ExprValuesCache. Local allocations need to survive until the 
cache is reset
-  // so we need to manually free probe expr local allocations.
-  if (ht_ctx_.get() != nullptr) ht_ctx_->FreeBuildLocalAllocations();
-  return ExecNode::QueryMaintenance(state);
-}
-
 Status PartitionedHashJoinNode::Reset(RuntimeState* state) {
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
     null_probe_output_idx_ = -1;
@@ -260,6 +260,7 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) {
   ScalarExpr::Close(build_exprs_);
   ScalarExpr::Close(probe_exprs_);
   ScalarExpr::Close(other_join_conjuncts_);
+  if (probe_expr_results_pool_ != nullptr) probe_expr_results_pool_->FreeAll();
   BlockingJoinNode::Close(state);
 }
 
@@ -591,10 +592,10 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* 
state, RowBatch* out_batch
         RETURN_IF_ERROR(NextSpilledProbeRowBatch(state, out_batch));
       }
     }
-    // Free local allocations of the probe side expressions only after 
ExprValuesCache
-    // has been reset.
+    // Free expr result allocations of the probe side expressions only after
+    // ExprValuesCache has been reset.
     DCHECK(ht_ctx_->expr_values_cache()->AtEnd());
-    ht_ctx_->FreeProbeLocalAllocations();
+    probe_expr_results_pool_->Clear();
 
     // We want to return as soon as we have attached a tuple stream to the 
out_batch
     // (before preparing a new partition). The attached tuple stream will be 
recycled

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h 
b/be/src/exec/partitioned-hash-join-node.h
index cb51b9d..572be34 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -111,7 +111,6 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   virtual void Close(RuntimeState* state) override;
 
  protected:
-  virtual Status QueryMaintenance(RuntimeState* state) override;
   virtual void AddToDebugString(
       int indentation_level, std::stringstream* out) const override;
 
@@ -423,6 +422,11 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// and probing of the hash tables.
   boost::scoped_ptr<HashTableCtx> ht_ctx_;
 
+  /// MemPool that stores allocations that hold results from evaluation of 
probe
+  /// exprs by 'ht_ctx_'. Cached probe expression values may reference memory 
in this
+  /// pool.
+  boost::scoped_ptr<MemPool> probe_expr_results_pool_;
+
   /// The iterator that corresponds to the look up of current_probe_row_.
   HashTable::Iterator hash_tbl_iterator_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/plan-root-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 2e8408a..9c20ff3 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -97,9 +97,10 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* 
batch) {
       RETURN_IF_ERROR(results_->AddOneRow(result_row, scales));
       ++current_batch_row;
     }
+    // Prevent expr result allocations from accumulating.
+    expr_results_pool_->Clear();
     // Signal the consumer.
     results_ = nullptr;
-    ScalarExprEvaluator::FreeLocalAllocations(output_expr_evals_);
     consumer_cv_.notify_all();
   }
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 94f73c0..eba7727 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -115,8 +115,7 @@ Status ScanNode::Prepare(RuntimeState* state) {
   DCHECK_EQ(filter_exprs_.size(), filter_ctxs_.size());
   for (int i = 0; i < filter_exprs_.size(); ++i) {
     RETURN_IF_ERROR(ScalarExprEvaluator::Create(*filter_exprs_[i], state, 
pool_,
-        expr_mem_pool(), &filter_ctxs_[i].expr_eval));
-    AddEvaluatorToFree(filter_ctxs_[i].expr_eval);
+        expr_perm_pool(), expr_results_pool(), &filter_ctxs_[i].expr_eval));
   }
 
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 16a09e4..3ed8b4a 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -44,12 +44,13 @@ static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0;
 
 ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* 
scan_node,
     HdfsPartitionDescriptor* partition_desc, DiskIoMgr::ScanRange* scan_range,
-    const vector<FilterContext>& filter_ctxs)
+    const vector<FilterContext>& filter_ctxs, MemPool* expr_results_pool)
   : state_(state),
     scan_node_(scan_node),
     partition_desc_(partition_desc),
     num_completed_io_buffers_(0),
-    filter_ctxs_(filter_ctxs) {
+    filter_ctxs_(filter_ctxs),
+    expr_results_pool_(expr_results_pool) {
   AddStream(scan_range);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index bd5623c..c13d26f 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -65,7 +65,8 @@ class ScannerContext {
   /// get pushed to) and the scan range to process.
   /// This context starts with 1 stream.
   ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*,
-      DiskIoMgr::ScanRange* scan_range, const std::vector<FilterContext>& 
filter_ctxs);
+      DiskIoMgr::ScanRange* scan_range, const std::vector<FilterContext>& 
filter_ctxs,
+      MemPool* expr_results_pool);
 
   /// Destructor verifies that all stream objects have been released.
   ~ScannerContext();
@@ -311,7 +312,7 @@ class ScannerContext {
   int num_completed_io_buffers() const { return num_completed_io_buffers_; }
   HdfsPartitionDescriptor* partition_descriptor() { return partition_desc_; }
   const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; 
}
-
+  MemPool* expr_results_pool() const { return expr_results_pool_; }
  private:
   friend class Stream;
 
@@ -329,6 +330,18 @@ class ScannerContext {
   /// Filter contexts for all filters applicable to this scan. Memory attached 
to the
   /// context is owned by the scan node.
   std::vector<FilterContext> filter_ctxs_;
+
+  /// MemPool used for allocations that hold results of expression evaluation 
in the
+  /// scanner and 'filter_ctxs_'. Must be thread-local since MemPool is not 
thread-safe.
+  /// Owned by ScannerThread() in the multi-threaded scan node and by the 
ExecNode in the
+  /// single-threaded scan node implementation.
+  ///
+  /// The scanner is responsible for clearing the MemPool periodically after 
expression
+  /// evaluation to prevent memory from accumulating.
+  ///
+  /// TODO: IMPALA-6015: it should be possible to simplify the lifecycle of 
this pool and
+  /// filter_ctxs_ once the multithreaded scan node is removed.
+  MemPool* const expr_results_pool_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 33b3acb..9ab1435 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -52,12 +52,11 @@ Status SortNode::Init(const TPlanNode& tnode, RuntimeState* 
state) {
 Status SortNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
-  less_than_.reset(new TupleRowComparator(ordering_exprs_, is_asc_order_, 
nulls_first_));
   sorter_.reset(
-      new Sorter(*less_than_, sort_tuple_exprs_, &row_descriptor_, 
mem_tracker(),
-          &buffer_pool_client_, resource_profile_.spillable_buffer_size,
-          runtime_profile(), state, id(), true));
-  RETURN_IF_ERROR(sorter_->Prepare(pool_, expr_mem_pool()));
+      new Sorter(ordering_exprs_, is_asc_order_, nulls_first_, 
sort_tuple_exprs_,
+          &row_descriptor_, mem_tracker(), &buffer_pool_client_,
+          resource_profile_.spillable_buffer_size, runtime_profile(), state, 
id(), true));
+  RETURN_IF_ERROR(sorter_->Prepare(pool_));
   DCHECK_GE(resource_profile_.min_reservation, 
sorter_->ComputeMinReservation());
   AddCodegenDisabledMessage(state);
   return Status::OK();
@@ -67,7 +66,7 @@ void SortNode::Codegen(RuntimeState* state) {
   DCHECK(state->ShouldCodegen());
   ExecNode::Codegen(state);
   if (IsNodeCodegenDisabled()) return;
-  Status codegen_status = less_than_->Codegen(state);
+  Status codegen_status = sorter_->Codegen(state);
   runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
 }
 
@@ -80,7 +79,6 @@ Status SortNode::Open(RuntimeState* state) {
   if (!buffer_pool_client_.is_registered()) {
     RETURN_IF_ERROR(ClaimBufferReservation(state));
   }
-  RETURN_IF_ERROR(less_than_->Open(pool_, state, expr_mem_pool()));
   RETURN_IF_ERROR(sorter_->Open());
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
@@ -153,7 +151,6 @@ Status SortNode::Reset(RuntimeState* state) {
 
 void SortNode::Close(RuntimeState* state) {
   if (is_closed()) return;
-  if (less_than_.get() != nullptr) less_than_->Close(state);
   if (sorter_ != nullptr) sorter_->Close(state);
   sorter_.reset();
   ScalarExpr::Close(ordering_exprs_);
@@ -161,11 +158,6 @@ void SortNode::Close(RuntimeState* state) {
   ExecNode::Close(state);
 }
 
-Status SortNode::QueryMaintenance(RuntimeState* state) {
-  sorter_->FreeLocalAllocations();
-  return ExecNode::QueryMaintenance(state);
-}
-
 void SortNode::DebugString(int indentation_level, stringstream* out) const {
   *out << string(indentation_level * 2, ' ');
   *out << "SortNode(" << ScalarExpr::DebugString(ordering_exprs_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/sort-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h
index d6eef25..8075b8e 100644
--- a/be/src/exec/sort-node.h
+++ b/be/src/exec/sort-node.h
@@ -46,7 +46,6 @@ class SortNode : public ExecNode {
   virtual void Close(RuntimeState* state);
 
  protected:
-  virtual Status QueryMaintenance(RuntimeState* state);
   virtual void DebugString(int indentation_level, std::stringstream* out) 
const;
 
  private:
@@ -56,9 +55,6 @@ class SortNode : public ExecNode {
   /// Number of rows to skip.
   int64_t offset_;
 
-  /// Compares tuples according to 'ordering_exprs'.
-  boost::scoped_ptr<TupleRowComparator> less_than_;
-
   /// Expressions and parameters used for tuple comparison.
   std::vector<ScalarExpr*> ordering_exprs_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/topn-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index 5bba89d..6b4946f 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -75,8 +75,7 @@ Status TopNNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   tuple_pool_.reset(new MemPool(mem_tracker()));
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(output_tuple_exprs_, state, 
pool_,
-      expr_mem_pool(), &output_tuple_expr_evals_));
-  AddEvaluatorsToFree(output_tuple_expr_evals_);
+      expr_perm_pool(), expr_results_pool(), &output_tuple_expr_evals_));
   tuple_row_less_than_.reset(
       new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
   output_tuple_desc_ = row_descriptor_.tuple_descriptors()[0];
@@ -139,7 +138,8 @@ void TopNNode::Codegen(RuntimeState* state) {
 Status TopNNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Open(state));
-  RETURN_IF_ERROR(tuple_row_less_than_->Open(pool_, state, expr_mem_pool()));
+  RETURN_IF_ERROR(
+      tuple_row_less_than_->Open(pool_, state, expr_perm_pool(), 
expr_results_pool()));
   RETURN_IF_ERROR(ScalarExprEvaluator::Open(output_tuple_expr_evals_, state));
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
@@ -231,11 +231,6 @@ void TopNNode::Close(RuntimeState* state) {
   ExecNode::Close(state);
 }
 
-Status TopNNode::QueryMaintenance(RuntimeState* state) {
-  tuple_row_less_than_->FreeLocalAllocations();
-  return ExecNode::QueryMaintenance(state);
-}
-
 // Reverse the order of the tuples in the priority queue
 void TopNNode::PrepareForOutput() {
   sorted_top_n_.resize(priority_queue_.size());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/topn-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/topn-node.h b/be/src/exec/topn-node.h
index b3cf1c0..e12cfc3 100644
--- a/be/src/exec/topn-node.h
+++ b/be/src/exec/topn-node.h
@@ -51,7 +51,6 @@ class TopNNode : public ExecNode {
   virtual void Close(RuntimeState* state);
 
  protected:
-  virtual Status QueryMaintenance(RuntimeState* state);
   virtual void DebugString(int indentation_level, std::stringstream* out) 
const;
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/union-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index b3681be..f2acc7a 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -80,8 +80,7 @@ Status UnionNode::Prepare(RuntimeState* state) {
   for (const vector<ScalarExpr*>& const_exprs : const_exprs_lists_) {
     vector<ScalarExprEvaluator*> const_expr_evals;
     RETURN_IF_ERROR(ScalarExprEvaluator::Create(const_exprs, state, pool_,
-        expr_mem_pool(), &const_expr_evals));
-    AddEvaluatorsToFree(const_expr_evals);
+        expr_perm_pool(), expr_results_pool(), &const_expr_evals));
     const_expr_evals_lists_.push_back(const_expr_evals);
   }
 
@@ -89,8 +88,7 @@ Status UnionNode::Prepare(RuntimeState* state) {
   for (const vector<ScalarExpr*>& child_exprs : child_exprs_lists_) {
     vector<ScalarExprEvaluator*> child_expr_evals;
     RETURN_IF_ERROR(ScalarExprEvaluator::Create(child_exprs, state, pool_,
-        expr_mem_pool(), &child_expr_evals));
-    AddEvaluatorsToFree(child_expr_evals);
+        expr_perm_pool(), expr_results_pool(), &child_expr_evals));
     child_expr_evals_lists_.push_back(child_expr_evals);
   }
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/unnest-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/unnest-node.cc b/be/src/exec/unnest-node.cc
index 2c02ff7..ef09731 100644
--- a/be/src/exec/unnest-node.cc
+++ b/be/src/exec/unnest-node.cc
@@ -84,7 +84,7 @@ Status UnnestNode::Prepare(RuntimeState* state) {
   item_byte_size_ = item_tuple_desc->byte_size();
 
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(*coll_expr_, state, pool_,
-      expr_mem_pool(), &coll_expr_eval_));
+      expr_perm_pool(), expr_results_pool(), &coll_expr_eval_));
 
   // Set the coll_slot_desc_ and the corresponding tuple index used for 
manually
   // evaluating the collection SlotRef and for projection.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/agg-fn-evaluator.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/agg-fn-evaluator.cc b/be/src/exprs/agg-fn-evaluator.cc
index f5a6098..1af76f4 100644
--- a/be/src/exprs/agg-fn-evaluator.cc
+++ b/be/src/exprs/agg-fn-evaluator.cc
@@ -71,10 +71,9 @@ typedef AnyVal (*FinalizeFn)(FunctionContext*, const 
AnyVal&);
 
 const char* AggFnEvaluator::LLVM_CLASS_NAME = "class.impala::AggFnEvaluator";
 
-AggFnEvaluator::AggFnEvaluator(const AggFn& agg_fn, MemPool* mem_pool, bool 
is_clone)
+AggFnEvaluator::AggFnEvaluator(const AggFn& agg_fn, bool is_clone)
   : is_clone_(is_clone),
-    agg_fn_(agg_fn),
-    mem_pool_(mem_pool) {
+    agg_fn_(agg_fn) {
 }
 
 AggFnEvaluator::~AggFnEvaluator() {
@@ -90,26 +89,27 @@ const ColumnType& AggFnEvaluator::intermediate_type() const 
{
 }
 
 Status AggFnEvaluator::Create(const AggFn& agg_fn, RuntimeState* state, 
ObjectPool* pool,
-    MemPool* mem_pool, AggFnEvaluator** result) {
+    MemPool* expr_perm_pool, MemPool* expr_results_pool, AggFnEvaluator** 
result) {
   *result = nullptr;
 
   // Create a new AggFn evaluator.
-  AggFnEvaluator* agg_fn_eval = pool->Add(new AggFnEvaluator(agg_fn, mem_pool, 
false));
-  agg_fn_eval->agg_fn_ctx_.reset(FunctionContextImpl::CreateContext(state, 
mem_pool,
-      agg_fn.GetIntermediateTypeDesc(), agg_fn.GetOutputTypeDesc(),
+  AggFnEvaluator* agg_fn_eval = pool->Add(new AggFnEvaluator(agg_fn, false));
+  agg_fn_eval->agg_fn_ctx_.reset(FunctionContextImpl::CreateContext(state, 
expr_perm_pool,
+      expr_results_pool, agg_fn.GetIntermediateTypeDesc(), 
agg_fn.GetOutputTypeDesc(),
       agg_fn.arg_type_descs()));
 
   Status status;
   // Create the evaluators for the input expressions.
   for (const ScalarExpr* input_expr : agg_fn.children()) {
     ScalarExprEvaluator* input_eval;
-    status = ScalarExprEvaluator::Create(*input_expr, state, pool, mem_pool, 
&input_eval);
+    status = ScalarExprEvaluator::Create(
+        *input_expr, state, pool, expr_perm_pool, expr_results_pool, 
&input_eval);
     if (UNLIKELY(!status.ok())) goto cleanup;
     agg_fn_eval->input_evals_.push_back(input_eval);
     DCHECK(&input_eval->root() == input_expr);
 
     AnyVal* staging_input_val;
-    status = AllocateAnyVal(state, mem_pool, input_expr->type(),
+    status = AllocateAnyVal(state, expr_perm_pool, input_expr->type(),
         "Could not allocate aggregate expression input value", 
&staging_input_val);
     agg_fn_eval->staging_input_vals_.push_back(staging_input_val);
     if (UNLIKELY(!status.ok())) goto cleanup;
@@ -117,11 +117,11 @@ Status AggFnEvaluator::Create(const AggFn& agg_fn, 
RuntimeState* state, ObjectPo
   DCHECK_EQ(agg_fn.GetNumChildren(), agg_fn_eval->input_evals_.size());
   DCHECK_EQ(agg_fn_eval->staging_input_vals_.size(), 
agg_fn_eval->input_evals_.size());
 
-  status = AllocateAnyVal(state, mem_pool, agg_fn.intermediate_type(),
+  status = AllocateAnyVal(state, expr_perm_pool, agg_fn.intermediate_type(),
       "Could not allocate aggregate expression intermediate value",
       &(agg_fn_eval->staging_intermediate_val_));
   if (UNLIKELY(!status.ok())) goto cleanup;
-  status = AllocateAnyVal(state, mem_pool, agg_fn.intermediate_type(),
+  status = AllocateAnyVal(state, expr_perm_pool, agg_fn.intermediate_type(),
       "Could not allocate aggregate expression merge input value",
       &(agg_fn_eval->staging_merge_input_val_));
   if (UNLIKELY(!status.ok())) goto cleanup;
@@ -140,10 +140,12 @@ cleanup:
 }
 
 Status AggFnEvaluator::Create(const vector<AggFn*>& agg_fns, RuntimeState* 
state,
-    ObjectPool* pool, MemPool* mem_pool, vector<AggFnEvaluator*>* evals) {
+    ObjectPool* pool, MemPool* expr_perm_pool, MemPool* expr_results_pool,
+    vector<AggFnEvaluator*>* evals) {
   for (const AggFn* agg_fn : agg_fns) {
     AggFnEvaluator* agg_fn_eval;
-    RETURN_IF_ERROR(AggFnEvaluator::Create(*agg_fn, state, pool, mem_pool, 
&agg_fn_eval));
+    RETURN_IF_ERROR(AggFnEvaluator::Create(*agg_fn, state, pool, 
expr_perm_pool,
+        expr_results_pool, &agg_fn_eval));
     evals->push_back(agg_fn_eval);
   }
   return Status::OK();
@@ -176,7 +178,6 @@ void AggFnEvaluator::Close(RuntimeState* state) {
   if (closed_) return;
   closed_ = true;
   if (!is_clone_) ScalarExprEvaluator::Close(input_evals_, state);
-  FreeLocalAllocations();
   agg_fn_ctx_->impl()->Close();
   agg_fn_ctx_.reset();
   input_evals_.clear();
@@ -494,11 +495,12 @@ void AggFnEvaluator::SerializeOrFinalize(Tuple* src,
   }
 }
 
-void AggFnEvaluator::ShallowClone(ObjectPool* pool, MemPool* mem_pool,
-    AggFnEvaluator** cloned_eval) const {
+void AggFnEvaluator::ShallowClone(ObjectPool* pool, MemPool* expr_perm_pool,
+    MemPool* expr_results_pool, AggFnEvaluator** cloned_eval) const {
   DCHECK(opened_);
-  *cloned_eval = pool->Add(new AggFnEvaluator(agg_fn_, mem_pool, true));
-  (*cloned_eval)->agg_fn_ctx_.reset(agg_fn_ctx_->impl()->Clone(mem_pool));
+  *cloned_eval = pool->Add(new AggFnEvaluator(agg_fn_, true));
+  (*cloned_eval)->agg_fn_ctx_.reset(
+      agg_fn_ctx_->impl()->Clone(expr_perm_pool, expr_results_pool));
   DCHECK_EQ((*cloned_eval)->input_evals_.size(), 0);
   (*cloned_eval)->input_evals_ = input_evals_;
   (*cloned_eval)->staging_input_vals_ = staging_input_vals_;
@@ -507,21 +509,22 @@ void AggFnEvaluator::ShallowClone(ObjectPool* pool, 
MemPool* mem_pool,
   (*cloned_eval)->opened_ = true;
 }
 
-void AggFnEvaluator::ShallowClone(ObjectPool* pool, MemPool* mem_pool,
-    const vector<AggFnEvaluator*>& evals,
+void AggFnEvaluator::ShallowClone(ObjectPool* pool, MemPool* expr_perm_pool,
+    MemPool* expr_results_pool, const vector<AggFnEvaluator*>& evals,
     vector<AggFnEvaluator*>* cloned_evals) {
   for (const AggFnEvaluator* eval : evals) {
     AggFnEvaluator* cloned_eval;
-    eval->ShallowClone(pool, mem_pool, &cloned_eval);
+    eval->ShallowClone(pool, expr_perm_pool, expr_results_pool, &cloned_eval);
     cloned_evals->push_back(cloned_eval);
   }
 }
 
-void AggFnEvaluator::FreeLocalAllocations() {
-  ScalarExprEvaluator::FreeLocalAllocations(input_evals_);
-  agg_fn_ctx_->impl()->FreeLocalAllocations();
-}
-
-void AggFnEvaluator::FreeLocalAllocations(const vector<AggFnEvaluator*>& 
evals) {
-  for (AggFnEvaluator* eval : evals) eval->FreeLocalAllocations();
+vector<ScopedResultsPool> ScopedResultsPool::Create(
+      const vector<AggFnEvaluator*>& evals, MemPool* new_results_pool) {
+  vector<ScopedResultsPool> result;
+  result.reserve(evals.size());
+  for (AggFnEvaluator* eval : evals) {
+    result.emplace_back(eval, new_results_pool);
+  }
+  return result;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/agg-fn-evaluator.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/agg-fn-evaluator.h b/be/src/exprs/agg-fn-evaluator.h
index 2ce968c..e8607a4 100644
--- a/be/src/exprs/agg-fn-evaluator.h
+++ b/be/src/exprs/agg-fn-evaluator.h
@@ -62,16 +62,26 @@ class AggFnEvaluator {
  public:
   /// Creates an AggFnEvaluator object from the aggregate expression 'agg_fn'.
   /// The evaluator is added to 'pool' and returned in 'eval'. This will also
-  /// create a single evaluator for each input expression. All allocations 
will come
-  /// from 'mem_pool'. Note that it's the responsibility to call Close() all 
evaluators
-  /// even if this function returns error status on initialization failure.
+  /// create a single evaluator for each input expression.
+  ///
+  /// Permanent allocations (i.e. those that must live until the evaluator is 
closed) come
+  /// from 'expr_perm_pool'.  Allocations that may contain expr results come 
from
+  /// 'expr_results_pool'. Lifetime of memory in 'expr_results_pool' is 
managed by the
+  /// owner of the pool and may freed at any time except when the evaluator is 
in the
+  /// middle of evaluating the expression. These pools can be shared between 
evaluators
+  /// (so long as the required memory lifetimes are compatible) but cannot be 
shared
+  /// between threads since MemPools are not thread-safe.
+  ///
+  /// Note that the caller is responsible to call Close() on all evaluators 
even if this
+  /// function returns error status on initialization failure.
   static Status Create(const AggFn& agg_fn, RuntimeState* state, ObjectPool* 
pool,
-      MemPool* mem_pool, AggFnEvaluator** eval) WARN_UNUSED_RESULT;
+      MemPool* expr_perm_pool, MemPool* expr_results_pool,
+      AggFnEvaluator** eval) WARN_UNUSED_RESULT;
 
   /// Convenience functions for creating evaluators for multiple aggregate 
functions.
   static Status Create(const std::vector<AggFn*>& agg_fns, RuntimeState* state,
-      ObjectPool* pool, MemPool* mem_pool, std::vector<AggFnEvaluator*>* evals)
-      WARN_UNUSED_RESULT;
+      ObjectPool* pool, MemPool* expr_perm_pool, MemPool* expr_results_pool,
+      vector<AggFnEvaluator*>* evals) WARN_UNUSED_RESULT;
 
   ~AggFnEvaluator();
 
@@ -87,17 +97,17 @@ class AggFnEvaluator {
   /// Used by PartitionedAggregation node to initialize one evaluator per 
partition.
   /// Avoid the overhead of re-initializing an evaluator (e.g. calling 
GetConstVal()
   /// on the input expressions). Cannot be called until after Open() has been 
called.
-  /// 'cloned_eval' is a shallow copy of this evaluator: all input values, 
staging
+  /// 'cloned_eval' is a shallow copy of this evaluator: all input evaluators, 
staging
   /// intermediate values and merge values are shared with the original 
evaluator. Only
   /// the FunctionContext 'agg_fn_ctx' is cloned for resource isolation per 
partition.
   /// So, it's not safe to use cloned evaluators concurrently.
-  void ShallowClone(
-      ObjectPool* pool, MemPool* mem_pool, AggFnEvaluator** cloned_eval) const;
+  void ShallowClone(ObjectPool* pool, MemPool* expr_perm_pool, MemPool* 
expr_results_pool,
+      AggFnEvaluator** cloned_eval) const;
 
   /// Convenience function for cloning multiple evaluators. The newly cloned 
evaluators
   /// are appended to 'cloned_evals'.
-  static void ShallowClone(ObjectPool* pool, MemPool* mem_pool,
-      const std::vector<AggFnEvaluator*>& evals,
+  static void ShallowClone(ObjectPool* pool, MemPool* expr_perm_pool,
+      MemPool* expr_results_pool, const std::vector<AggFnEvaluator*>& evals,
       std::vector<AggFnEvaluator*>* cloned_evals);
 
   /// Free resources owned by the evaluator.
@@ -110,56 +120,70 @@ class AggFnEvaluator {
 
   ScalarExprEvaluator* const* IR_ALWAYS_INLINE input_evals() const;
 
-  /// Call the initialization function of the AggFn. May update 'dst'.
+  /// Call the initialization function of the AggFn. May update 'dst'. Any 
var-len string
+  /// data referenced from the tuple must be backed by an allocation from
+  /// FunctionContext::Allocate() (which is ultimately backed by the permanent 
MemPool).
   void Init(Tuple* dst);
 
   /// Updates the intermediate state dst based on adding the input src row. 
This can be
   /// called either to drive the UDA's Update() or Merge() function, depending 
on whether
-  /// the AggFn is a merging aggregation.
+  /// the AggFn is a merging aggregation. Any var-len string data referenced 
from the
+  /// tuple must be backed by an allocation from FunctionContext::Allocate() 
(which is
+  /// ultimately backed by the permanent MemPool).
   void Add(const TupleRow* src, Tuple* dst);
 
   /// Updates the intermediate state dst to remove the input src row, i.e. undo
-  /// Add(src, dst). Only used internally for analytic fn builtins.
+  /// Add(src, dst). Only used internally for analytic fn builtins. Any 
var-len string
+  /// data referenced from the tuple must be backed by an expr-managed 
allocation from
+  /// FunctionContext::Allocate() (which is ultimately backed by the permanent 
MemPool).
   void Remove(const TupleRow* src, Tuple* dst);
 
   /// Explicitly does a merge, even if this evaluator is not marked as merging.
   /// This is used by the partitioned agg node when it needs to merge spill 
results.
-  /// In the non-spilling case, this node would normally not merge.
+  /// In the non-spilling case, this node would normally not merge. Any 
var-len string
+  /// data referenced from the tuple must be backed by an expr-managed 
allocation from
+  /// FunctionContext::Allocate() (which is ultimately backed by the permanent 
MemPool).
   void Merge(Tuple* src, Tuple* dst);
 
   /// Flattens any intermediate values containing pointers, and frees any 
memory
-  /// allocated during the init, update and merge phases.
+  /// allocated during the init, update and merge phases. Note that a 
variable-length
+  /// string result is backed by the results MemPool, so the caller must be 
careful not to
+  /// clear that pool until it is done with the results of expr evaluation.
   void Serialize(Tuple* dst);
 
   /// Does one final transformation of the aggregated value in 'agg_val' and 
stores the
   /// result in 'output_val'. Also frees the resources allocated during init, 
update and
-  /// merge phases.
+  /// merge phases. Note that variable-length string results are backed by the 
results
+  /// MemPool, so the caller must be careful not to clear that pool until it 
is done
+  /// with the results of expr evaluation.
   void Finalize(Tuple* agg_val, Tuple* output_val);
 
   /// Puts the finalized value from Tuple* src in Tuple* dst just as 
Finalize() does.
   /// However, unlike Finalize(), GetValue() does not clean up state in src.
   /// GetValue() can be called repeatedly with the same src. Only used 
internally for
-  /// analytic fn builtins. Note that StringVal result is from local 
allocation (which
-  /// will be freed in the next QueryMaintenance()) so it needs to be copied 
out if it
-  /// needs to survive beyond QueryMaintenance() (e.g. if 'dst' lives in a row 
batch).
+  /// analytic fn builtins. Note that variable-length string results are 
backed by
+  /// the results MemPool, so the caller must be careful not to clear the 
results pool
+  /// until it is done with the results of expr evaluation.
   void GetValue(Tuple* src, Tuple* dst);
 
   /// Helper functions for calling the above functions on many evaluators.
   static void Init(const std::vector<AggFnEvaluator*>& evals, Tuple* dst);
-  static void Add(const std::vector<AggFnEvaluator*>& evals, const TupleRow* 
src,
-      Tuple* dst);
-  static void Remove(const std::vector<AggFnEvaluator*>& evals,
-      const TupleRow* src, Tuple* dst);
-  static void Serialize(const std::vector<AggFnEvaluator*>& evals,
-      Tuple* dst);
-  static void GetValue(const std::vector<AggFnEvaluator*>& evals, Tuple* src,
-      Tuple* dst);
-  static void Finalize(const std::vector<AggFnEvaluator*>& evals, Tuple* src,
-      Tuple* dst);
-
-  /// Free local allocations made in UDA functions and input arguments' evals.
-  void FreeLocalAllocations();
-  static void FreeLocalAllocations(const std::vector<AggFnEvaluator*>& evals);
+  static void Add(
+      const std::vector<AggFnEvaluator*>& evals, const TupleRow* src, Tuple* 
dst);
+  static void Remove(
+      const std::vector<AggFnEvaluator*>& evals, const TupleRow* src, Tuple* 
dst);
+  static void Serialize(const std::vector<AggFnEvaluator*>& evals, Tuple* dst);
+  static void GetValue(const std::vector<AggFnEvaluator*>& evals, Tuple* src, 
Tuple* dst);
+  static void Finalize(const std::vector<AggFnEvaluator*>& evals, Tuple* src, 
Tuple* dst);
+
+  /// Replaces the current pool used for the aggregate function's result 
allocations
+  /// with 'new_results_pool' and returns the previously-used pool. Useful if 
the
+  /// caller wants functions like Serialize(), Finalize() and GetValue() to 
allocate
+  /// from a different MemPool. Does does *not* change the pool for the input 
exprs.
+  /// This should generally be used via ScopedResultsPool instead of directly.
+  MemPool* SwapResultsPool(MemPool* new_results_pool) {
+    return agg_fn_ctx_->impl()->SwapResultsPool(new_results_pool);
+  }
 
   std::string DebugString() const;
   static std::string DebugString(const std::vector<AggFnEvaluator*>& evals);
@@ -178,10 +202,6 @@ class AggFnEvaluator {
 
   const AggFn& agg_fn_;
 
-  /// Pointer to the MemPool which all allocations come from.
-  /// Owned by the exec node which owns this evaluator.
-  MemPool* mem_pool_ = nullptr;
-
   /// This contains runtime state such as constant input arguments to the 
aggregate
   /// functions and a FreePool from which the intermediate values are 
allocated.
   /// Owned by this evaluator.
@@ -202,7 +222,7 @@ class AggFnEvaluator {
   impala_udf::AnyVal* staging_merge_input_val_ = nullptr;
 
   /// Use Create() instead.
-  AggFnEvaluator(const AggFn& agg_fn, MemPool* mem_pool, bool is_clone);
+  AggFnEvaluator(const AggFn& agg_fn, bool is_clone);
 
   /// Return the intermediate type of the aggregate function.
   inline const SlotDescriptor& intermediate_slot_desc() const;
@@ -221,10 +241,9 @@ class AggFnEvaluator {
   /// Sets up the arguments to call 'fn'. This converts from the agg-expr 
signature,
   /// taking TupleRow to the UDA signature taking AnyVals. Writes the 
serialize/finalize
   /// result to the given destination slot/tuple. 'fn' can be NULL to indicate 
the src
-  /// value should simply be written into the destination. Note that StringVal 
result is
-  /// from local allocation (which will be freed in the next 
QueryMaintenance()) so it
-  /// needs to be copied out if it needs to survive beyond QueryMaintenance() 
(e.g. if
-  /// 'dst' lives in a row batch).
+  /// value should simply be written into the destination. Note that a 
variable-length
+  /// string result is backed by the results MemPool, so the caller must be 
careful not to
+  /// clear that pool until it is done with the results of expr evaluation.
   void SerializeOrFinalize(Tuple* src, const SlotDescriptor& dst_slot_desc,
       Tuple* dst, void* fn);
 
@@ -289,8 +308,24 @@ inline void AggFnEvaluator::Finalize(const 
std::vector<AggFnEvaluator*>& evals,
   }
 }
 
+/// Utility class to swap in a different results pool for the aggregate 
functions.
+/// The previous results pool is restored when this goes out of scope.
+class ScopedResultsPool {
+ public:
+  ScopedResultsPool(AggFnEvaluator* agg_fn_eval, MemPool* new_results_pool)
+    : agg_fn_eval_(agg_fn_eval),
+      prev_results_pool_(agg_fn_eval->SwapResultsPool(new_results_pool)) {}
+
+  ~ScopedResultsPool() { agg_fn_eval_->SwapResultsPool(prev_results_pool_); }
 
+  /// Helper to swap in the same pool to many evaluators.
+  static std::vector<ScopedResultsPool> Create(
+      const std::vector<AggFnEvaluator*>& evals, MemPool* new_results_pool);
 
+ private:
+  AggFnEvaluator* const agg_fn_eval_;
+  MemPool* const prev_results_pool_;
+};
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/case-expr.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/case-expr.cc b/be/src/exprs/case-expr.cc
index 322c975..5a3d1fb 100644
--- a/be/src/exprs/case-expr.cc
+++ b/be/src/exprs/case-expr.cc
@@ -59,11 +59,11 @@ Status 
CaseExpr::OpenEvaluator(FunctionContext::FunctionStateScope scope,
   fn_ctx->SetFunctionState(FunctionContext::THREAD_LOCAL, case_state);
 
   const ColumnType& case_val_type = has_case_expr_ ? GetChild(0)->type() : 
TYPE_BOOLEAN;
-  RETURN_IF_ERROR(AllocateAnyVal(state, eval->mem_pool(), case_val_type,
+  RETURN_IF_ERROR(AllocateAnyVal(state, eval->expr_perm_pool(), case_val_type,
       "Could not allocate expression value", &case_state->case_val));
   const ColumnType& when_val_type =
       has_case_expr_ ? GetChild(1)->type() : GetChild(0)->type();
-  RETURN_IF_ERROR(AllocateAnyVal(state, eval->mem_pool(), when_val_type,
+  RETURN_IF_ERROR(AllocateAnyVal(state, eval->expr_perm_pool(), when_val_type,
       "Could not allocate expression value", &case_state->when_val));
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/cast-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/cast-functions-ir.cc 
b/be/src/exprs/cast-functions-ir.cc
index 518dd5a..26bbe9b 100644
--- a/be/src/exprs/cast-functions-ir.cc
+++ b/be/src/exprs/cast-functions-ir.cc
@@ -185,7 +185,7 @@ StringVal CastFunctions::CastToChar(FunctionContext* ctx, 
const StringVal& val)
   DCHECK_GE(type.len, 1);
   char* cptr;
   if (type.len > val.len) {
-    cptr = reinterpret_cast<char*>(ctx->impl()->AllocateLocal(type.len));
+    cptr = reinterpret_cast<char*>(ctx->impl()->AllocateForResults(type.len));
     if (UNLIKELY(cptr == NULL)) {
       DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
       return StringVal::null();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index bf7808d..a460f5f 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -1234,7 +1234,8 @@ void ExprTest::TestSingleLiteralConstruction(
 
   Literal* expr = CreateLiteral(type, string_val);
   ScalarExprEvaluator* eval;
-  EXPECT_OK(ScalarExprEvaluator::Create(*expr, &state, &pool, &mem_pool, 
&eval));
+  EXPECT_OK(
+      ScalarExprEvaluator::Create(*expr, &state, &pool, &mem_pool, &mem_pool, 
&eval));
   EXPECT_OK(eval->Open(&state));
   EXPECT_EQ(0, RawValue::Compare(eval->GetValue(nullptr), &value, type))
       << "type: " << type << ", value: " << value;
@@ -1252,7 +1253,8 @@ TEST_F(ExprTest, NullLiteral) {
 
     NullLiteral expr(static_cast<PrimitiveType>(type));
     ScalarExprEvaluator* eval;
-    EXPECT_OK(ScalarExprEvaluator::Create(expr, &state, &pool, &mem_pool, 
&eval));
+    EXPECT_OK(
+        ScalarExprEvaluator::Create(expr, &state, &pool, &mem_pool, &mem_pool, 
&eval));
     EXPECT_OK(eval->Open(&state));
     EXPECT_TRUE(eval->GetValue(nullptr) == nullptr);
     eval->Close(&state);
@@ -3293,12 +3295,18 @@ TEST_F(ExprTest, StringFunctions) {
   StringVal bam(static_cast<uint8_t*>(short_buf->data()), 
StringVal::MAX_LENGTH);
   auto r4 = StringFunctions::Replace(context, bam, z, aaa);
   EXPECT_TRUE(r4.is_null);
+  // Re-create context to clear the error from failed allocation.
+  UdfTestHarness::CloseContext(context);
+  context = UdfTestHarness::CreateTestContext(str_desc, v, nullptr, &pool);
 
   // Similar test for second overflow.  This tests overflowing on 
re-allocation.
   (*short_buf)[4095] = 'Z';
   StringVal bam2(static_cast<uint8_t*>(short_buf->data()), 
StringVal::MAX_LENGTH-2);
   auto r5 = StringFunctions::Replace(context, bam2, z, aaa);
   EXPECT_TRUE(r5.is_null);
+  // Re-create context to clear the error from failed allocation.
+  UdfTestHarness::CloseContext(context);
+  context = UdfTestHarness::CreateTestContext(str_desc, v, nullptr, &pool);
 
   // Finally, test expanding to exactly MAX_LENGTH
   // There are 4 Zs in giga4 (not including the trailing one, as we truncate 
that)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/hive-udf-call.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/hive-udf-call.cc b/be/src/exprs/hive-udf-call.cc
index a3e0fe6..19e2e63 100644
--- a/be/src/exprs/hive-udf-call.cc
+++ b/be/src/exprs/hive-udf-call.cc
@@ -229,7 +229,7 @@ Status 
HiveUdfCall::OpenEvaluator(FunctionContext::FunctionStateScope scope,
   RETURN_ERROR_IF_EXC(env);
   RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, 
&jni_ctx->executor));
 
-  RETURN_IF_ERROR(AllocateAnyVal(state, eval->mem_pool(), type_,
+  RETURN_IF_ERROR(AllocateAnyVal(state, eval->expr_perm_pool(), type_,
       "Could not allocate JNI output value", &jni_ctx->output_anyval));
   return Status::OK();
 }
@@ -327,7 +327,7 @@ StringVal HiveUdfCall::GetStringVal(
   DCHECK_EQ(type_.type, TYPE_STRING);
   StringVal result = *reinterpret_cast<StringVal*>(Evaluate(eval, row));
   if (result.is_null) return StringVal::null();
-  // Copy the string into a local allocation with the usual lifetime for expr 
results.
+  // Copy the string into a result allocation with the usual lifetime for expr 
results.
   // Needed because the UDF output buffer is owned by the Java UDF executor 
and may be
   // freed or reused by the next call into the Java UDF executor.
   FunctionContext* fn_ctx = eval->fn_context(fn_ctx_idx_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/scalar-expr-evaluator.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/scalar-expr-evaluator.cc 
b/be/src/exprs/scalar-expr-evaluator.cc
index a327dd4..b1ace96 100644
--- a/be/src/exprs/scalar-expr-evaluator.cc
+++ b/be/src/exprs/scalar-expr-evaluator.cc
@@ -62,21 +62,21 @@ using namespace impala_udf;
 
 const char* ScalarExprEvaluator::LLVM_CLASS_NAME = 
"class.impala::ScalarExprEvaluator";
 
-ScalarExprEvaluator::ScalarExprEvaluator(const ScalarExpr& root, MemPool* 
mem_pool)
-  : mem_pool_(mem_pool),
-    root_(root) {
-}
+ScalarExprEvaluator::ScalarExprEvaluator(
+    const ScalarExpr& root, MemPool* expr_perm_pool, MemPool* 
expr_results_pool)
+  : expr_perm_pool_(expr_perm_pool), root_(root) {}
 
 ScalarExprEvaluator::~ScalarExprEvaluator() {
   DCHECK(!initialized_ || closed_);
 }
 
 Status ScalarExprEvaluator::Create(const ScalarExpr& root, RuntimeState* state,
-    ObjectPool* pool, MemPool* mem_pool, ScalarExprEvaluator** eval) {
-  *eval = pool->Add(new ScalarExprEvaluator(root, mem_pool));
+    ObjectPool* pool, MemPool* expr_perm_pool, MemPool* expr_results_pool,
+    ScalarExprEvaluator** eval) {
+  *eval = pool->Add(new ScalarExprEvaluator(root, expr_perm_pool, 
expr_results_pool));
   if (root.fn_ctx_idx_end_ > 0) {
     (*eval)->fn_ctxs_.resize(root.fn_ctx_idx_end_, nullptr);
-    (*eval)->CreateFnCtxs(state, root);
+    (*eval)->CreateFnCtxs(state, root, expr_perm_pool, expr_results_pool);
     DCHECK_EQ((*eval)->fn_ctxs_.size(), root.fn_ctx_idx_end_);
     for (FunctionContext* fn_ctx : (*eval)->fn_ctxs_) DCHECK(fn_ctx != 
nullptr);
     (*eval)->fn_ctxs_ptr_ = (*eval)->fn_ctxs_.data();
@@ -91,10 +91,11 @@ Status ScalarExprEvaluator::Create(const ScalarExpr& root, 
RuntimeState* state,
 }
 
 Status ScalarExprEvaluator::Create(const vector<ScalarExpr*>& exprs, 
RuntimeState* state,
-    ObjectPool* pool, MemPool* mem_pool, vector<ScalarExprEvaluator*>* evals) {
+    ObjectPool* pool, MemPool* expr_perm_pool, MemPool* expr_results_pool,
+    vector<ScalarExprEvaluator*>* evals) {
   for (const ScalarExpr* expr : exprs) {
     ScalarExprEvaluator* eval;
-    Status status = Create(*expr, state, pool, mem_pool, &eval);
+    Status status = Create(*expr, state, pool, expr_perm_pool, 
expr_results_pool, &eval);
     // Always add the evaluator to the vector so it can be cleaned up.
     evals->push_back(eval);
     RETURN_IF_ERROR(status);
@@ -102,12 +103,13 @@ Status ScalarExprEvaluator::Create(const 
vector<ScalarExpr*>& exprs, RuntimeStat
   return Status::OK();
 }
 
-void ScalarExprEvaluator::CreateFnCtxs(RuntimeState* state, const ScalarExpr& 
expr) {
+void ScalarExprEvaluator::CreateFnCtxs(RuntimeState* state, const ScalarExpr& 
expr,
+    MemPool* expr_perm_pool, MemPool* expr_results_pool) {
   const int fn_ctx_idx = expr.fn_ctx_idx();
   const bool has_fn_ctx = fn_ctx_idx != -1;
   vector<FunctionContext::TypeDesc> arg_types;
   for (const ScalarExpr* child : expr.children()) {
-    CreateFnCtxs(state, *child);
+    CreateFnCtxs(state, *child, expr_perm_pool, expr_results_pool);
     if (has_fn_ctx) 
arg_types.push_back(AnyValUtil::ColumnTypeToTypeDesc(child->type()));
   }
   if (has_fn_ctx) {
@@ -117,8 +119,8 @@ void ScalarExprEvaluator::CreateFnCtxs(RuntimeState* state, 
const ScalarExpr& ex
     DCHECK_GE(fn_ctx_idx, 0);
     DCHECK_LT(fn_ctx_idx, fn_ctxs_.size());
     DCHECK(fn_ctxs_[fn_ctx_idx] == nullptr);
-    fn_ctxs_[fn_ctx_idx] = FunctionContextImpl::CreateContext(
-        state, mem_pool_, return_type, arg_types, varargs_buffer_size);
+    fn_ctxs_[fn_ctx_idx] = FunctionContextImpl::CreateContext(state, 
expr_perm_pool,
+        expr_results_pool, return_type, arg_types, varargs_buffer_size);
   }
 }
 
@@ -151,8 +153,8 @@ void ScalarExprEvaluator::Close(RuntimeState* state) {
     delete fn_ctxs_[i];
   }
   fn_ctxs_.clear();
-  // Memory allocated by 'fn_ctx_' is still in 'mem_pool_'. It's the 
responsibility of
-  // the owner of 'mem_pool_' to free it.
+  // Memory allocated by 'fn_ctx_' is still in the MemPools. It's the 
responsibility of
+  // the owners of those pools to free it.
   closed_ = true;
 }
 
@@ -162,12 +164,15 @@ void ScalarExprEvaluator::Close(
 }
 
 Status ScalarExprEvaluator::Clone(ObjectPool* pool, RuntimeState* state,
-    MemPool* mem_pool, ScalarExprEvaluator** cloned_eval) const {
+    MemPool* expr_perm_pool, MemPool* expr_results_pool,
+    ScalarExprEvaluator** cloned_eval) const {
   DCHECK(initialized_);
   DCHECK(opened_);
-  *cloned_eval = pool->Add(new ScalarExprEvaluator(root_, mem_pool));
+  *cloned_eval = pool->Add(
+      new ScalarExprEvaluator(root_, expr_perm_pool, expr_results_pool));
   for (int i = 0; i < fn_ctxs_.size(); ++i) {
-    (*cloned_eval)->fn_ctxs_.push_back(fn_ctxs_[i]->impl()->Clone(mem_pool));
+    (*cloned_eval)->fn_ctxs_.push_back(
+        fn_ctxs_[i]->impl()->Clone(expr_perm_pool, expr_results_pool));
   }
   (*cloned_eval)->fn_ctxs_ptr_ = (*cloned_eval)->fn_ctxs_.data();
   (*cloned_eval)->is_clone_ = true;
@@ -178,46 +183,20 @@ Status ScalarExprEvaluator::Clone(ObjectPool* pool, 
RuntimeState* state,
 }
 
 Status ScalarExprEvaluator::Clone(ObjectPool* pool, RuntimeState* state,
-    MemPool* mem_pool, const vector<ScalarExprEvaluator*>& evals,
+    MemPool* expr_perm_pool, MemPool* expr_results_pool,
+    const vector<ScalarExprEvaluator*>& evals,
     vector<ScalarExprEvaluator*>* cloned_evals) {
   DCHECK(cloned_evals != nullptr);
   DCHECK(cloned_evals->empty());
   for (int i = 0; i < evals.size(); ++i) {
     ScalarExprEvaluator* cloned_eval;
-    RETURN_IF_ERROR(evals[i]->Clone(pool, state, mem_pool, &cloned_eval));
+    RETURN_IF_ERROR(
+        evals[i]->Clone(pool, state, expr_perm_pool, expr_results_pool, 
&cloned_eval));
     cloned_evals->push_back(cloned_eval);
   }
   return Status::OK();
 }
 
-bool ScalarExprEvaluator::HasLocalAllocations() const {
-  for (int i = 0; i < fn_ctxs_.size(); ++i) {
-    if (fn_ctxs_[i]->impl()->closed()) continue;
-    if (fn_ctxs_[i]->impl()->HasLocalAllocations()) return true;
-  }
-  return false;
-}
-
-bool ScalarExprEvaluator::HasLocalAllocations(
-    const vector<ScalarExprEvaluator*>& evals) {
-  for (int i = 0; i < evals.size(); ++i) {
-    if (evals[i]->HasLocalAllocations()) return true;
-  }
-  return false;
-}
-
-void ScalarExprEvaluator::FreeLocalAllocations() {
-  for (int i = 0; i < fn_ctxs_.size(); ++i) {
-    if (fn_ctxs_[i]->impl()->closed()) continue;
-    fn_ctxs_[i]->impl()->FreeLocalAllocations();
-  }
-}
-
-void ScalarExprEvaluator::FreeLocalAllocations(
-    const vector<ScalarExprEvaluator*>& evals) {
-  for (int i = 0; i < evals.size(); ++i) evals[i]->FreeLocalAllocations();
-}
-
 Status ScalarExprEvaluator::GetError(int start_idx, int end_idx) const {
   DCHECK(opened_);
   end_idx = end_idx == -1 ? fn_ctxs_.size() : end_idx;
@@ -241,9 +220,9 @@ Status ScalarExprEvaluator::GetConstValue(RuntimeState* 
state, const ScalarExpr&
 
   // A constant expression shouldn't have any SlotRefs expr in it.
   DCHECK_EQ(expr.GetSlotIds(), 0);
-  DCHECK(mem_pool_ != nullptr);
+  DCHECK(expr_perm_pool_ != nullptr);
   const ColumnType& result_type = expr.type();
-  RETURN_IF_ERROR(AllocateAnyVal(state, mem_pool_, result_type,
+  RETURN_IF_ERROR(AllocateAnyVal(state, expr_perm_pool_, result_type,
       "Could not allocate constant expression value", const_val));
 
   void* result = ScalarExprEvaluator::GetValue(expr, nullptr);
@@ -252,9 +231,10 @@ Status ScalarExprEvaluator::GetConstValue(RuntimeState* 
state, const ScalarExpr&
     StringVal* sv = reinterpret_cast<StringVal*>(*const_val);
     if (!sv->is_null && sv->len > 0) {
       // Make sure the memory is owned by this evaluator.
-      char* ptr_copy = 
reinterpret_cast<char*>(mem_pool_->TryAllocateUnaligned(sv->len));
+      char* ptr_copy =
+          
reinterpret_cast<char*>(expr_perm_pool_->TryAllocateUnaligned(sv->len));
       if (ptr_copy == nullptr) {
-        return mem_pool_->mem_tracker()->MemLimitExceeded(
+        return expr_perm_pool_->mem_tracker()->MemLimitExceeded(
             state, "Could not allocate constant string value", sv->len);
       }
       memcpy(ptr_copy, sv->ptr, sv->len);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/scalar-expr-evaluator.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/scalar-expr-evaluator.h 
b/be/src/exprs/scalar-expr-evaluator.h
index 5792d14..9a27951 100644
--- a/be/src/exprs/scalar-expr-evaluator.h
+++ b/be/src/exprs/scalar-expr-evaluator.h
@@ -71,19 +71,28 @@ class ScalarExprEvaluator {
   ~ScalarExprEvaluator();
 
   /// Creates an evaluator for the scalar expression tree rooted at 'expr' and 
all
-  /// FunctionContexts needed during evaluation. Allocations from this 
evaluator will
-  /// be from 'mem_pool'. The newly created evaluator will be stored in 'pool' 
and
-  /// returned in 'eval'. Returns error status on failure. Note that it's the
-  /// responsibility to call Close() on all created evaluators even if this 
function
-  /// returns error on initialization failure.
+  /// FunctionContexts needed during evaluation.
+  ///
+  /// Permanent allocations (i.e. those that must live until the evaluator is 
closed) come
+  /// from 'expr_perm_pool'. Allocations that may contain expr results (i.e. 
the
+  /// results of GetValue(), GetStringVal(), etc) come from 
'expr_results_pool'. Lifetime
+  /// of memory in 'expr_results_pool' is managed by the owner of the pool and 
may freed
+  /// by the owner at any time except when the evaluator is in the middle of 
evaluating
+  /// the expression. These pools can be shared between evaluators (so long as 
the
+  /// required memory lifetimes are compatible) but cannot be shared between 
threads
+  /// since MemPools are not thread-safe.
+  ///
+  /// Note that the caller is responsible to call Close() on all evaluators 
even if this
+  /// function returns error status on initialization failure.
   static Status Create(const ScalarExpr& expr, RuntimeState* state, 
ObjectPool* pool,
-      MemPool* mem_pool, ScalarExprEvaluator** eval) WARN_UNUSED_RESULT;
+      MemPool* expr_perm_pool, MemPool* expr_results_pool,
+      ScalarExprEvaluator** eval) WARN_UNUSED_RESULT;
 
   /// Convenience function for creating multiple ScalarExprEvaluators. The 
evaluators
   /// are returned in 'evals'.
   static Status Create(const std::vector<ScalarExpr*>& exprs, RuntimeState* 
state,
-      ObjectPool* pool, MemPool* mem_pool, std::vector<ScalarExprEvaluator*>* 
evals)
-      WARN_UNUSED_RESULT;
+      ObjectPool* pool, MemPool* expr_perm_pool, MemPool* expr_results_pool,
+      std::vector<ScalarExprEvaluator*>* evals) WARN_UNUSED_RESULT;
 
   /// Initializes the ScalarExprEvaluator on all nodes in the ScalarExpr tree. 
This is
   /// also the location in which constant arguments to functions are computed. 
Does not
@@ -105,21 +114,22 @@ class ScalarExprEvaluator {
 
   /// Creates a copy of this ScalarExprEvaluator. Open() must be called first. 
The copy
   /// contains clones of each FunctionContext, which share the fragment-local 
state of the
-  /// original one but have their own FreePool and thread-local state. This 
should be used
+  /// original one but have their own memory and thread-local state. This 
should be used
   /// to create an ScalarExprEvaluator for each execution thread that needs to 
evaluate
-  /// 'root_'. All allocations will be from 'mem_pool' so callers should use 
different
-  /// MemPool for evaluators in different threads. Note that clones are 
considered opened.
-  /// The cloned ScalarExprEvaluator cannot be used after the original 
ScalarExprEvaluator
-  /// is destroyed because it may reference fragment-local state from the 
original.
+  /// 'root_'. 'expr_perm_pool' and 'expr_results_pool' are used for 
allocations so callers
+  /// must use different MemPools for evaluators in different threads. Note 
that clones
+  /// are considered opened. The cloned ScalarExprEvaluator cannot be used 
after the
+  /// original ScalarExprEvaluator is destroyed because it may reference 
fragment-local
+  /// state from the original.
   /// TODO: IMPALA-4743: Evaluate input arguments in ScalarExpr::Init() and 
store them
   /// in ScalarExpr.
-  Status Clone(ObjectPool* pool, RuntimeState* state, MemPool* mem_pool,
-      ScalarExprEvaluator** new_eval) const WARN_UNUSED_RESULT;
+  Status Clone(ObjectPool* pool, RuntimeState* state, MemPool* expr_perm_pool,
+      MemPool* expr_results_pool, ScalarExprEvaluator** new_eval) const 
WARN_UNUSED_RESULT;
 
   /// Convenience functions for cloning multiple ScalarExprEvaluators. The 
newly
   /// created evaluators are appended to 'new_evals.
-  static Status Clone(ObjectPool* pool, RuntimeState* state, MemPool* mem_pool,
-      const std::vector<ScalarExprEvaluator*>& evals,
+  static Status Clone(ObjectPool* pool, RuntimeState* state, MemPool* 
expr_perm_pool,
+      MemPool* expr_results_pool, const std::vector<ScalarExprEvaluator*>& 
evals,
       std::vector<ScalarExprEvaluator*>* new_evals) WARN_UNUSED_RESULT;
 
   /// If 'expr' is constant, evaluates it with no input row argument and 
returns the
@@ -166,15 +176,6 @@ class ScalarExprEvaluator {
   void PrintValue(void* value, std::string* str);
   void PrintValue(void* value, std::stringstream* stream);
 
-  /// Returns true if any of the expression contexts in the array has local 
allocations.
-  static bool HasLocalAllocations(const std::vector<ScalarExprEvaluator*>& 
evals);
-  bool HasLocalAllocations() const;
-
-  /// Frees all local allocations made by fn_ctxs_. This can be called when 
result
-  /// data from this context is no longer needed.
-  void FreeLocalAllocations();
-  static void FreeLocalAllocations(const std::vector<ScalarExprEvaluator*>& 
evals);
-
   /// Get the number of digits after the decimal that should be displayed for 
this value.
   /// Returns -1 if no scale has been specified (currently the scale is only 
set for
   /// doubles set by RoundUpTo). GetValue() must have already been called.
@@ -184,7 +185,7 @@ class ScalarExprEvaluator {
   bool opened() const { return opened_; }
   bool closed() const { return closed_; }
   bool is_clone() const { return is_clone_; }
-  MemPool* mem_pool() const { return mem_pool_; }
+  MemPool* expr_perm_pool() const { return expr_perm_pool_; }
 
   /// The builtin functions are not called from anywhere in the code and the
   /// symbols are therefore not included in the binary. We call these functions
@@ -220,9 +221,10 @@ class ScalarExprEvaluator {
   /// to access the correct FunctionContext.
   FunctionContext** fn_ctxs_ptr_ = nullptr;
 
-  /// Pointer to the MemPool which all allocations (including fn_ctxs_') come 
from.
-  /// Owned by the exec node which owns this evaluator.
-  MemPool* mem_pool_;
+  /// Pointer to the MemPool which all permanent allocations (including those 
from
+  /// 'fn_ctxs_') come from. Owned by the exec node or data sink which owns 
this
+  /// evaluator.
+  MemPool* const expr_perm_pool_;
 
   /// The expr tree which this evaluator is for.
   const ScalarExpr& root_;
@@ -246,13 +248,14 @@ class ScalarExprEvaluator {
   /// TODO: move this to Expr initialization after IMPALA-4743 is fixed.
   int output_scale_ = -1;
 
-  ScalarExprEvaluator(const ScalarExpr& root, MemPool* mem_pool);
+  ScalarExprEvaluator(const ScalarExpr& root, MemPool* expr_perm_pool,
+      MemPool* expr_results_pool);
 
   /// Walks the expression tree 'expr' and fills in 'fn_ctxs_' for all Expr 
nodes
   /// which need FunctionContext.
-  void CreateFnCtxs(RuntimeState* state, const ScalarExpr& expr);
+  void CreateFnCtxs(RuntimeState* state, const ScalarExpr& expr, MemPool* 
expr_perm_pool,
+      MemPool* expr_results_pool);
 };
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/scalar-fn-call.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/scalar-fn-call.cc b/be/src/exprs/scalar-fn-call.cc
index 73b497d..82b886c 100644
--- a/be/src/exprs/scalar-fn-call.cc
+++ b/be/src/exprs/scalar-fn-call.cc
@@ -170,7 +170,7 @@ Status 
ScalarFnCall::OpenEvaluator(FunctionContext::FunctionStateScope scope,
     vector<AnyVal*>* input_vals = fn_ctx->impl()->staging_input_vals();
     for (int i = 0; i < NumFixedArgs(); ++i) {
       AnyVal* input_val;
-      RETURN_IF_ERROR(AllocateAnyVal(state, eval->mem_pool(), 
children_[i]->type(),
+      RETURN_IF_ERROR(AllocateAnyVal(state, eval->expr_perm_pool(), 
children_[i]->type(),
           "Could not allocate expression value", &input_val));
       input_vals->push_back(input_val);
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exprs/string-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/string-functions-ir.cc 
b/be/src/exprs/string-functions-ir.cc
index d0c8679..49bc8c1 100644
--- a/be/src/exprs/string-functions-ir.cc
+++ b/be/src/exprs/string-functions-ir.cc
@@ -294,7 +294,7 @@ StringVal StringFunctions::Replace(FunctionContext* 
context, const StringVal& st
   }
 
   StringVal result(context, buffer_space);
-  // If the result went over MAX_LENGTH, we can get a null result back
+  // result may be NULL if we went over MAX_LENGTH or the allocation failed.
   if (UNLIKELY(result.is_null)) return result;
 
   uint8_t* ptr = result.ptr;
@@ -333,23 +333,22 @@ StringVal StringFunctions::Replace(FunctionContext* 
context, const StringVal& st
       // Also no overflow: min_output <= MAX_LENGTH and delta <= MAX_LENGTH - 1
       const int64_t space_needed = min_output + delta;
       if (UNLIKELY(space_needed > buffer_space)) {
-        // Double at smaller sizes, but don't grow more than a megabyte a
-        // time at larger sizes.  Reasoning: let the allocator do its job
-        // and don't depend on policy here.
-        static_assert(StringVal::MAX_LENGTH % (1 << 20) == 0,
-            "Math requires StringVal::MAX_LENGTH to be a multiple of 1MB");
-        // Must compute next power of two using 64-bit math to avoid signed 
overflow
-        // The following DCHECK was supposed to be a static assertion, but 
C++11 is
-        // broken and doesn't declare std::min or std::max to be constexpr.  
Fix this
-        // when eventually the minimum supported standard is raised to at 
least C++14
-        DCHECK_EQ(static_cast<int>(std::min<int64_t>(
-            BitUtil::RoundUpToPowerOfTwo(StringVal::MAX_LENGTH+1),
-            StringVal::MAX_LENGTH + (1 << 20))),
-            StringVal::MAX_LENGTH + (1 << 20));
-        buffer_space = static_cast<int>(std::min<int64_t>(
-            BitUtil::RoundUpToPowerOfTwo(space_needed),
-            space_needed + (1 << 20)));
-        if (UNLIKELY(!result.Resize(context, buffer_space))) return 
StringVal::null();
+        // Check to see if we can allocate a large enough buffer.
+        if (space_needed > StringVal::MAX_LENGTH) {
+          context->SetError(
+              "String length larger than allowed limit of 1 GB character 
data.");
+          return StringVal::null();
+        }
+        // Double the buffer size whenever it fills up to amortise cost of 
resizing.
+        // Must compute next power of two using 64-bit math to avoid signed 
overflow.
+        buffer_space = min<int>(StringVal::MAX_LENGTH,
+            static_cast<int>(BitUtil::RoundUpToPowerOfTwo(space_needed)));
+
+        // Give up if the allocation fails or we hit an error. This prevents 
us from
+        // continuing to blow past the mem limit.
+        if (UNLIKELY(!result.Resize(context, buffer_space) || 
context->has_error())) {
+          return StringVal::null();
+        }
         // Don't forget to move the pointer
         ptr = result.ptr + bytes_produced;
       }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index ad9f074..9c655ec 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -38,7 +38,6 @@ add_library(Runtime
   disk-io-mgr-scan-range.cc
   disk-io-mgr-stress.cc
   exec-env.cc
-  free-pool.cc
   fragment-instance-state.cc
   hbase-table.cc
   hbase-table-factory.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc 
b/be/src/runtime/data-stream-sender.cc
index f39a595..dc26a23 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -390,7 +390,8 @@ Status DataStreamSender::Prepare(RuntimeState* state, 
MemTracker* parent_mem_tra
   state_ = state;
   SCOPED_TIMER(profile_->total_time_counter());
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_exprs_, state,
-      state->obj_pool(), expr_mem_pool(), &partition_expr_evals_));
+      state->obj_pool(), expr_perm_pool_.get(), expr_results_pool_.get(),
+      &partition_expr_evals_));
   bytes_sent_counter_ = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
   uncompressed_bytes_counter_ =
       ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
@@ -479,7 +480,7 @@ Status DataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
     }
   }
   COUNTER_ADD(total_sent_rows_counter_, batch->num_rows());
-  ScalarExprEvaluator::FreeLocalAllocations(partition_expr_evals_);
+  expr_results_pool_->Clear();
   RETURN_IF_ERROR(state->CheckQueryState());
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc 
b/be/src/runtime/data-stream-test.cc
index 8e85894..79a97b6 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -307,7 +307,8 @@ class DataStreamTest : public testing::Test {
     ordering_exprs_.push_back(lhs_slot);
     less_than_ = obj_pool_.Add(new TupleRowComparator(ordering_exprs_,
         is_asc_, nulls_first_));
-    ASSERT_OK(less_than_->Open(&obj_pool_, runtime_state_.get(), 
mem_pool_.get()));
+    ASSERT_OK(less_than_->Open(
+        &obj_pool_, runtime_state_.get(), mem_pool_.get(), mem_pool_.get()));
   }
 
   // Create batch_, but don't fill it with data yet. Assumes we created 
row_desc_.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/descriptors.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index 0f97f76..ef553fb 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -486,7 +486,7 @@ Status DescriptorTbl::CreatePartKeyExprs(
     // TODO: RowDescriptor should arguably be optional in Prepare for known 
literals.
     // Partition exprs are not used in the codegen case. Don't codegen them.
     RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_key_value_exprs, 
nullptr,
-        pool, nullptr, &part_desc->partition_key_value_evals_));
+        pool, nullptr, nullptr, &part_desc->partition_key_value_evals_));
     RETURN_IF_ERROR(ScalarExprEvaluator::Open(
         part_desc->partition_key_value_evals_, nullptr));
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/free-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/free-pool.cc b/be/src/runtime/free-pool.cc
deleted file mode 100644
index df0569d..0000000
--- a/be/src/runtime/free-pool.cc
+++ /dev/null
@@ -1,28 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/free-pool.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-#ifndef NDEBUG
-
-AtomicInt32 FreePool::alloc_counts_(0);
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/free-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/free-pool.h b/be/src/runtime/free-pool.h
index f4a3e4b..52c7137 100644
--- a/be/src/runtime/free-pool.h
+++ b/be/src/runtime/free-pool.h
@@ -25,13 +25,11 @@
 #include <sstream>
 #include <unordered_map>
 
-#include "common/atomic.h"
 #include "common/logging.h"
 #include "gutil/dynamic_annotations.h"
 #include "runtime/mem-pool.h"
 #include "util/bit-util.h"
 
-DECLARE_int32(stress_free_pool_alloc);
 DECLARE_bool(disable_mem_pools);
 
 namespace impala {
@@ -62,12 +60,6 @@ class FreePool {
   /// Allocates a buffer of size between [0, 2^62 - 1 - sizeof(FreeListNode)] 
bytes.
   uint8_t* Allocate(const int64_t requested_size) {
     DCHECK_GE(requested_size, 0);
-#ifndef NDEBUG
-    if (FLAGS_stress_free_pool_alloc > 0 &&
-        (alloc_counts_.Add(1) % FLAGS_stress_free_pool_alloc) == 0) {
-      return nullptr;
-    }
-#endif
     /// Return a non-nullptr dummy pointer. nullptr is reserved for failures.
     if (UNLIKELY(requested_size == 0)) return mem_pool_->EmptyAllocPtr();
     ++net_allocations_;
@@ -140,12 +132,6 @@ class FreePool {
   /// nullptr will be returned on allocation failure. It's the caller's 
responsibility to
   /// free the memory buffer pointed to by "ptr" in this case.
   uint8_t* Reallocate(uint8_t* ptr, int64_t size) {
-#ifndef NDEBUG
-    if (FLAGS_stress_free_pool_alloc > 0 &&
-        (alloc_counts_.Add(1) % FLAGS_stress_free_pool_alloc) == 0) {
-      return nullptr;
-    }
-#endif
     if (UNLIKELY(ptr == nullptr || ptr == mem_pool_->EmptyAllocPtr())) return 
Allocate(size);
     if (FLAGS_disable_mem_pools) {
       return reinterpret_cast<uint8_t*>(realloc(reinterpret_cast<void*>(ptr), 
size));
@@ -258,12 +244,6 @@ class FreePool {
 
   /// Diagnostic counter that tracks (# Allocates - # Frees)
   int64_t net_allocations_;
-
-#ifndef NDEBUG
-  /// Counter for tracking the number of allocations. Used only if the
-  /// the stress flag FLAGS_stress_free_pool_alloc is set.
-  static AtomicInt32 alloc_counts_;
-#endif
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/sorted-run-merger.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.cc 
b/be/src/runtime/sorted-run-merger.cc
index 673d6c3..8f5ede9 100644
--- a/be/src/runtime/sorted-run-merger.cc
+++ b/be/src/runtime/sorted-run-merger.cc
@@ -172,9 +172,6 @@ Status SortedRunMerger::GetNext(RowBatch* output_batch, 
bool* eos) {
     output_batch->CommitLastRow();
     RETURN_IF_ERROR(AdvanceMinRow(output_batch));
   }
-  // Free local allocations made by comparator_.Less();
-  comparator_.FreeLocalAllocations();
-
   *eos = min_heap_.empty();
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index 16984ca..b85e61b 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -541,8 +541,8 @@ class Sorter::TupleIterator {
 /// instance to check for cancellation during an in-memory sort.
 class Sorter::TupleSorter {
  public:
-  TupleSorter(const TupleRowComparator& comparator, int64_t page_size, int 
tuple_size,
-      RuntimeState* state);
+  TupleSorter(Sorter* parent, const TupleRowComparator& comparator, int64_t 
page_size,
+      int tuple_size, RuntimeState* state);
 
   ~TupleSorter();
 
@@ -555,6 +555,8 @@ class Sorter::TupleSorter {
  private:
   static const int INSERTION_THRESHOLD = 16;
 
+  Sorter* const parent_;
+
   /// Size of the tuples in memory.
   const int tuple_size_;
 
@@ -562,7 +564,7 @@ class Sorter::TupleSorter {
   const TupleRowComparator& comparator_;
 
   /// Number of times comparator_.Less() can be invoked again before
-  /// comparator_.FreeLocalAllocations() needs to be called.
+  /// comparator_. expr_results_pool_.Clear() needs to be called.
   int num_comparisons_till_free_;
 
   /// Runtime state instance to check for cancellation. Not owned.
@@ -581,7 +583,7 @@ class Sorter::TupleSorter {
   /// high: Mersenne Twister should be more than adequate.
   mt19937_64 rng_;
 
-  /// Wrapper around comparator_.Less(). Also call 
comparator_.FreeLocalAllocations()
+  /// Wrapper around comparator_.Less(). Also call expr_results_pool_.Clear()
   /// on every 'state_->batch_size()' invocations of comparator_.Less(). 
Returns true
   /// if 'lhs' is less than 'rhs'.
   bool Less(const TupleRow* lhs, const TupleRow* rhs);
@@ -1251,9 +1253,10 @@ void Sorter::TupleIterator::PrevPage(Sorter::Run* run, 
int tuple_size) {
   tuple_ = run->fixed_len_pages_[page_index_].data() + last_tuple_page_offset;
 }
 
-Sorter::TupleSorter::TupleSorter(const TupleRowComparator& comp, int64_t 
page_size,
-    int tuple_size, RuntimeState* state)
-  : tuple_size_(tuple_size),
+Sorter::TupleSorter::TupleSorter(Sorter* parent, const TupleRowComparator& 
comp,
+    int64_t page_size, int tuple_size, RuntimeState* state)
+  : parent_(parent),
+    tuple_size_(tuple_size),
     comparator_(comp),
     num_comparisons_till_free_(state->batch_size()),
     state_(state) {
@@ -1270,7 +1273,7 @@ bool Sorter::TupleSorter::Less(const TupleRow* lhs, const 
TupleRow* rhs) {
   --num_comparisons_till_free_;
   DCHECK_GE(num_comparisons_till_free_, 0);
   if (UNLIKELY(num_comparisons_till_free_ == 0)) {
-    comparator_.FreeLocalAllocations();
+    parent_->expr_results_pool_.Clear();
     num_comparisons_till_free_ = state_->batch_size();
   }
   return comparator_.Less(lhs, rhs);
@@ -1463,14 +1466,17 @@ inline void Sorter::TupleSorter::Swap(Tuple* left, 
Tuple* right, Tuple* swap_tup
   memcpy(right, swap_tuple, tuple_size);
 }
 
-Sorter::Sorter(const TupleRowComparator& compare_less_than,
+Sorter::Sorter(const std::vector<ScalarExpr*>& ordering_exprs,
+      const std::vector<bool>& is_asc_order, const std::vector<bool>& 
nulls_first,
     const vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* 
output_row_desc,
     MemTracker* mem_tracker, BufferPool::ClientHandle* buffer_pool_client,
     int64_t page_len, RuntimeProfile* profile, RuntimeState* state, int 
node_id,
     bool enable_spilling)
   : node_id_(node_id),
     state_(state),
-    compare_less_than_(compare_less_than),
+    expr_perm_pool_(mem_tracker),
+    expr_results_pool_(mem_tracker),
+    compare_less_than_(ordering_exprs, is_asc_order, nulls_first),
     in_mem_tuple_sorter_(NULL),
     buffer_pool_client_(buffer_pool_client),
     page_len_(page_len),
@@ -1495,7 +1501,7 @@ Sorter::~Sorter() {
   DCHECK(merge_output_run_ == NULL);
 }
 
-Status Sorter::Prepare(ObjectPool* obj_pool, MemPool* expr_mem_pool) {
+Status Sorter::Prepare(ObjectPool* obj_pool) {
   DCHECK(in_mem_tuple_sorter_ == NULL) << "Already prepared";
   // Page byte offsets are packed into uint32_t values, which limits the 
supported
   // page size.
@@ -1513,7 +1519,7 @@ Status Sorter::Prepare(ObjectPool* obj_pool, MemPool* 
expr_mem_pool) {
         PrettyPrinter::Print(state_->query_options().max_row_size, 
TUnit::BYTES));
   }
   has_var_len_slots_ = sort_tuple_desc->HasVarlenSlots();
-  in_mem_tuple_sorter_.reset(new TupleSorter(compare_less_than_, page_len_,
+  in_mem_tuple_sorter_.reset(new TupleSorter(this, compare_less_than_, 
page_len_,
       sort_tuple_desc->byte_size(), state_));
 
   if (enable_spilling_) {
@@ -1528,25 +1534,26 @@ Status Sorter::Prepare(ObjectPool* obj_pool, MemPool* 
expr_mem_pool) {
   run_sizes_ = ADD_SUMMARY_STATS_COUNTER(profile_, "NumRowsPerRun", 
TUnit::UNIT);
 
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(sort_tuple_exprs_, state_, 
obj_pool,
-      expr_mem_pool, &sort_tuple_expr_evals_));
+      &expr_perm_pool_, &expr_results_pool_, &sort_tuple_expr_evals_));
   return Status::OK();
 }
 
+Status Sorter::Codegen(RuntimeState* state) {
+  return compare_less_than_.Codegen(state);
+}
+
 Status Sorter::Open() {
   DCHECK(in_mem_tuple_sorter_ != NULL) << "Not prepared";
   DCHECK(unsorted_run_ == NULL) << "Already open";
+  RETURN_IF_ERROR(compare_less_than_.Open(&obj_pool_, state_, &expr_perm_pool_,
+      &expr_results_pool_));
   TupleDescriptor* sort_tuple_desc = output_row_desc_->tuple_descriptors()[0];
-  unsorted_run_ = obj_pool_.Add(new Run(this, sort_tuple_desc, true));
+  unsorted_run_ = run_pool_.Add(new Run(this, sort_tuple_desc, true));
   RETURN_IF_ERROR(unsorted_run_->Init());
   RETURN_IF_ERROR(ScalarExprEvaluator::Open(sort_tuple_expr_evals_, state_));
   return Status::OK();
 }
 
-void Sorter::FreeLocalAllocations() {
-  compare_less_than_.FreeLocalAllocations();
-  ScalarExprEvaluator::FreeLocalAllocations(sort_tuple_expr_evals_);
-}
-
 int64_t Sorter::ComputeMinReservation() {
   // Must be kept in sync with SortNode.computeNodeResourceProfile() in fe.
   int min_buffers_required = enable_spilling_ ? MIN_BUFFERS_PER_MERGE : 1;
@@ -1572,16 +1579,20 @@ Status Sorter::AddBatch(RowBatch* batch) {
       RETURN_IF_ERROR(SortCurrentInputRun());
       RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
       unsorted_run_ =
-          obj_pool_.Add(new Run(this, 
output_row_desc_->tuple_descriptors()[0], true));
+          run_pool_.Add(new Run(this, 
output_row_desc_->tuple_descriptors()[0], true));
       RETURN_IF_ERROR(unsorted_run_->Init());
     }
   }
+  // Clear any temporary allocations made while materializing the sort tuples.
+  expr_results_pool_.Clear();
   return Status::OK();
 }
 
 Status Sorter::AddBatchNoSpill(RowBatch* batch, int start_index, int* 
num_processed) {
   DCHECK(batch != nullptr);
   RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, start_index, 
num_processed));
+  // Clear any temporary allocations made while materializing the sort tuples.
+  expr_results_pool_.Clear();
   return Status::OK();
 }
 
@@ -1617,7 +1628,10 @@ Status Sorter::GetNext(RowBatch* output_batch, bool* 
eos) {
     DCHECK(sorted_runs_.back()->is_pinned());
     return sorted_runs_.back()->GetNext<false>(output_batch, eos);
   } else {
-    return merger_->GetNext(output_batch, eos);
+    RETURN_IF_ERROR(merger_->GetNext(output_batch, eos));
+    // Clear any temporary allocations made by the merger.
+    expr_results_pool_.Clear();
+    return Status::OK();
   }
 }
 
@@ -1626,13 +1640,16 @@ void Sorter::Reset() {
   merger_.reset();
   // Free resources from the current runs.
   CleanupAllRuns();
-  obj_pool_.Clear();
+  compare_less_than_.Close(state_);
 }
 
 void Sorter::Close(RuntimeState* state) {
   CleanupAllRuns();
-  obj_pool_.Clear();
+  compare_less_than_.Close(state);
   ScalarExprEvaluator::Close(sort_tuple_expr_evals_, state);
+  expr_perm_pool_.FreeAll();
+  expr_results_pool_.FreeAll();
+  obj_pool_.Clear();
 }
 
 void Sorter::CleanupAllRuns() {
@@ -1642,6 +1659,7 @@ void Sorter::CleanupAllRuns() {
   unsorted_run_ = NULL;
   if (merge_output_run_ != NULL) merge_output_run_->CloseAllPages();
   merge_output_run_ = NULL;
+  run_pool_.Clear();
 }
 
 Status Sorter::SortCurrentInputRun() {
@@ -1683,7 +1701,7 @@ Status Sorter::MergeIntermediateRuns() {
     // intermediate merges.
     // TODO: this isn't optimal: we could defer creating the merged run if we 
have
     // reliable reservations (IMPALA-3200).
-    merge_output_run_ = obj_pool_.Add(
+    merge_output_run_ = run_pool_.Add(
         new Run(this, output_row_desc_->tuple_descriptors()[0], false));
     RETURN_IF_ERROR(merge_output_run_->Init());
     RETURN_IF_ERROR(CreateMerger(num_runs_to_merge));
@@ -1754,6 +1772,8 @@ Status Sorter::ExecuteIntermediateMerge(Sorter::Run* 
merged_run) {
     // Copy rows into the new run until done.
     int num_copied;
     RETURN_IF_CANCELLED(state_);
+    // Clear any temporary allocations made by the merger.
+    expr_results_pool_.Clear();
     RETURN_IF_ERROR(merger_->GetNext(&intermediate_merge_batch, &eos));
     RETURN_IF_ERROR(
         merged_run->AddIntermediateBatch(&intermediate_merge_batch, 0, 
&num_copied));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/sorter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h
index 5e7240b..2527958 100644
--- a/be/src/runtime/sorter.h
+++ b/be/src/runtime/sorter.h
@@ -93,18 +93,18 @@ class RowBatch;
 class Sorter {
  public:
   /// 'sort_tuple_exprs' are the slot exprs used to materialize the tuples to 
be
-  /// sorted. 'compare_less_than' is a comparator for the sort tuples (returns 
true if
-  /// lhs < rhs). 'merge_batch_size_' is the size of the batches created to 
provide rows
-  /// to the merger and retrieve rows from an intermediate merger. 'node_id' 
is the ID of
-  /// the exec node using the sorter for error reporting. 'enable_spilling' 
should be set
-  /// to false to reduce the number of requested buffers if the caller will use
-  /// AddBatchNoSpill().
+  /// sorted. 'ordering_exprs', 'is_asc_order' and 'nulls_first' are parameters
+  /// for the comparator for the sort tuples.
+  /// 'node_id' is the ID of the exec node using the sorter for error 
reporting.
+  /// 'enable_spilling' should be set to false to reduce the number of 
requested buffers
+  /// if the caller will use AddBatchNoSpill().
   ///
   /// The Sorter assumes that it has exclusive use of the client's
   /// reservations for sorting, and may increase the size of the client's 
reservation.
   /// The caller is responsible for ensuring that the minimum reservation 
(returned from
   /// ComputeMinReservation()) is available.
-  Sorter(const TupleRowComparator& compare_less_than,
+  Sorter(const std::vector<ScalarExpr*>& ordering_exprs,
+      const std::vector<bool>& is_asc_order, const std::vector<bool>& 
nulls_first,
       const std::vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* 
output_row_desc,
       MemTracker* mem_tracker, BufferPool::ClientHandle* client, int64_t 
page_len,
       RuntimeProfile* profile, RuntimeState* state, int node_id,
@@ -113,8 +113,11 @@ class Sorter {
 
   /// Initial set-up of the sorter for execution.
   /// The evaluators for 'sort_tuple_exprs_' will be created and stored in 
'obj_pool'.
-  /// All allocation from the evaluators will be from 'expr_mem_pool'.
-  Status Prepare(ObjectPool* obj_pool, MemPool* expr_mem_pool) 
WARN_UNUSED_RESULT;
+  Status Prepare(ObjectPool* obj_pool) WARN_UNUSED_RESULT;
+
+  /// Do codegen for the Sorter. Called after Prepare() if codegen is desired. 
Returns OK
+  /// if successful or a Status describing the reason why Codegen failed 
otherwise.
+  Status Codegen(RuntimeState* state);
 
   /// Opens the sorter for adding rows and initializes the evaluators for 
materializing
   /// the tuples. Must be called after Prepare() or Reset() and before calling 
AddBatch().
@@ -137,9 +140,6 @@ class Sorter {
   /// Get the next batch of sorted output rows from the sorter.
   Status GetNext(RowBatch* batch, bool* eos) WARN_UNUSED_RESULT;
 
-  /// Free any local allocations made when materializing and sorting the 
tuples.
-  void FreeLocalAllocations();
-
   /// Resets all internal state like ExecNode::Reset().
   /// Init() must have been called, AddBatch()/GetNext()/InputDone()
   /// may or may not have been called.
@@ -196,8 +196,15 @@ class Sorter {
   /// Runtime state instance used to check for cancellation. Not owned.
   RuntimeState* const state_;
 
+  /// MemPool for allocating data structures used by expression evaluators in 
the sorter.
+  MemPool expr_perm_pool_;
+
+  /// MemPool for allocations that hold results of expression evaluation in 
the sorter.
+  /// Cleared periodically during sorting to prevent memory accumulating.
+  MemPool expr_results_pool_;
+
   /// In memory sorter and less-than comparator.
-  const TupleRowComparator& compare_less_than_;
+  TupleRowComparator compare_less_than_;
   boost::scoped_ptr<TupleSorter> in_mem_tuple_sorter_;
 
   /// Client used to allocate pages from the buffer pool. Not owned.
@@ -251,7 +258,7 @@ class Sorter {
   Run* merge_output_run_;
 
   /// Pool of owned Run objects. Maintains Runs objects across non-freeing 
Reset() calls.
-  ObjectPool obj_pool_;
+  ObjectPool run_pool_;
 
   /// END: Members that must be Reset()
   /////////////////////////////////////////
@@ -259,6 +266,9 @@ class Sorter {
   /// Runtime profile and counters for this sorter instance.
   RuntimeProfile* profile_;
 
+  /// Pool of objects (e.g. exprs) that are not freed during Reset() calls.
+  ObjectPool obj_pool_;
+
   /// Number of initial runs created.
   RuntimeProfile::Counter* initial_runs_counter_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/runtime/tuple.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.cc b/be/src/runtime/tuple.cc
index 86f199d..252175f 100644
--- a/be/src/runtime/tuple.cc
+++ b/be/src/runtime/tuple.cc
@@ -233,8 +233,7 @@ void Tuple::MaterializeExprs(TupleRow* row, const 
TupleDescriptor& desc,
 }
 
 // Codegens an unrolled version of MaterializeExprs(). Uses codegen'd exprs 
and slot
-// writes. If 'pool' is non-NULL, string data is copied into it. Note that the 
generated
-// function ignores its 'pool' arg; instead we hardcode the pointer in the IR.
+// writes. If 'pool' is non-NULL, string data is copied into it.
 //
 // Example IR for materializing a string column with non-NULL 'pool':
 //

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 54b78a5..b7a5e81 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -206,7 +206,7 @@ 
Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow(
     exprs.push_back(expr);
     ScalarExprEvaluator* eval;
     status = ScalarExprEvaluator::Create(*expr, &state, &obj_pool, 
&expr_mem_pool,
-        &eval);
+        &expr_mem_pool, &eval);
     evals.push_back(eval);
     if (!status.ok()) goto error;
   }


Reply via email to