http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc 
b/be/src/exec/partitioned-aggregation-node.cc
index 83232d2..20ee5e9 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -27,8 +27,8 @@
 #include "exec/hash-table.inline.h"
 #include "exprs/agg-fn-evaluator.h"
 #include "exprs/anyval-util.h"
-#include "exprs/expr-context.h"
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "exprs/slot-ref.h"
 #include "gutil/strings/substitute.h"
 #include "runtime/buffered-tuple-stream.inline.h"
@@ -104,9 +104,10 @@ PartitionedAggregationNode::PartitionedAggregationNode(
     ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
   : ExecNode(pool, tnode, descs),
     intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
-    intermediate_tuple_desc_(NULL),
+    intermediate_tuple_desc_(descs.GetTupleDescriptor(intermediate_tuple_id_)),
+    intermediate_row_desc_(pool->Add(new 
RowDescriptor(intermediate_tuple_desc_, false))),
     output_tuple_id_(tnode.agg_node.output_tuple_id),
-    output_tuple_desc_(NULL),
+    output_tuple_desc_(descs.GetTupleDescriptor(output_tuple_id_)),
     needs_finalize_(tnode.agg_node.need_finalize),
     is_streaming_preagg_(tnode.agg_node.use_streaming_preaggregation),
     needs_serialize_(false),
@@ -137,7 +138,7 @@ PartitionedAggregationNode::PartitionedAggregationNode(
     partition_pool_(new ObjectPool()) {
   DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS);
   if (is_streaming_preagg_) {
-    DCHECK(conjunct_ctxs_.empty()) << "Preaggs have no conjuncts";
+    DCHECK(conjunct_evals_.empty()) << "Preaggs have no conjuncts";
     DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do 
grouping";
     DCHECK(limit_ == -1) << "Preaggs have no limits";
   }
@@ -145,24 +146,35 @@ PartitionedAggregationNode::PartitionedAggregationNode(
 
 Status PartitionedAggregationNode::Init(const TPlanNode& tnode, RuntimeState* 
state) {
   RETURN_IF_ERROR(ExecNode::Init(tnode, state));
-  RETURN_IF_ERROR(
-      Expr::CreateExprTrees(pool_, tnode.agg_node.grouping_exprs, 
&grouping_expr_ctxs_));
-  for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) {
-    AggFnEvaluator* evaluator;
-    RETURN_IF_ERROR(
-        AggFnEvaluator::Create(pool_, tnode.agg_node.aggregate_functions[i], 
&evaluator));
-    aggregate_evaluators_.push_back(evaluator);
-    ExprContext* const* agg_expr_ctxs;
-    if (evaluator->input_expr_ctxs().size() > 0) {
-      agg_expr_ctxs = evaluator->input_expr_ctxs().data();
-    } else {
-      // Some aggregate functions have no input expressions and therefore no 
ExprContext
-      // (e.g. count(*)). In those cases, 'agg_expr_ctxs_' will contain NULL 
for that
-      // entry.
-      DCHECK(evaluator->agg_op() == AggFnEvaluator::OTHER || 
evaluator->is_count_star());
-      agg_expr_ctxs = NULL;
-    }
-    agg_expr_ctxs_.push_back(agg_expr_ctxs);
+
+  DCHECK(intermediate_tuple_desc_ != nullptr);
+  DCHECK(output_tuple_desc_ != nullptr);
+  DCHECK_EQ(intermediate_tuple_desc_->slots().size(), 
output_tuple_desc_->slots().size());
+  const RowDescriptor& row_desc = child(0)->row_desc();
+  RETURN_IF_ERROR(ScalarExpr::Create(tnode.agg_node.grouping_exprs, row_desc,
+      state, &grouping_exprs_));
+
+  // Construct build exprs from intermediate_row_desc_
+  for (int i = 0; i < grouping_exprs_.size(); ++i) {
+    SlotDescriptor* desc = intermediate_tuple_desc_->slots()[i];
+    DCHECK(desc->type().type == TYPE_NULL || desc->type() == 
grouping_exprs_[i]->type());
+    // Hack to avoid TYPE_NULL SlotRefs.
+    SlotRef* build_expr = pool_->Add(desc->type().type != TYPE_NULL ?
+        new SlotRef(desc) : new SlotRef(desc, TYPE_BOOLEAN));
+    build_exprs_.push_back(build_expr);
+    RETURN_IF_ERROR(build_expr->Init(*intermediate_row_desc_, state));
+    if (build_expr->type().IsVarLenStringType()) 
string_grouping_exprs_.push_back(i);
+  }
+
+  int j = grouping_exprs_.size();
+  for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i, ++j) {
+    SlotDescriptor* intermediate_slot_desc = 
intermediate_tuple_desc_->slots()[j];
+    SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j];
+    AggFn* agg_fn;
+    RETURN_IF_ERROR(AggFn::Create(tnode.agg_node.aggregate_functions[i], 
row_desc,
+        *intermediate_slot_desc, *output_slot_desc, state, &agg_fn));
+    agg_fns_.push_back(agg_fn);
+    needs_serialize_ |= agg_fn->SupportsSerialize();
   }
   return Status::OK();
 }
@@ -205,60 +217,16 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* 
state) {
         "MaxPartitionLevel", TUnit::UNIT);
   }
 
-  intermediate_tuple_desc_ =
-      state->desc_tbl().GetTupleDescriptor(intermediate_tuple_id_);
-  output_tuple_desc_ = state->desc_tbl().GetTupleDescriptor(output_tuple_id_);
-  DCHECK_EQ(intermediate_tuple_desc_->slots().size(),
-        output_tuple_desc_->slots().size());
-
-  RETURN_IF_ERROR(Expr::Prepare(grouping_expr_ctxs_, state, 
child(0)->row_desc(),
-      expr_mem_tracker()));
-  AddExprCtxsToFree(grouping_expr_ctxs_);
-
-  // Construct build exprs from intermediate_agg_tuple_desc_
-  for (int i = 0; i < grouping_expr_ctxs_.size(); ++i) {
-    SlotDescriptor* desc = intermediate_tuple_desc_->slots()[i];
-    DCHECK(desc->type().type == TYPE_NULL ||
-        desc->type() == grouping_expr_ctxs_[i]->root()->type());
-    // Hack to avoid TYPE_NULL SlotRefs.
-    Expr* expr = desc->type().type != TYPE_NULL ?
-        new SlotRef(desc) : new SlotRef(desc, TYPE_BOOLEAN);
-    state->obj_pool()->Add(expr);
-    build_expr_ctxs_.push_back(new ExprContext(expr));
-    state->obj_pool()->Add(build_expr_ctxs_.back());
-    if (expr->type().IsVarLenStringType()) {
-      string_grouping_exprs_.push_back(i);
-    }
-  }
-  // Construct a new row desc for preparing the build exprs because neither 
the child's
-  // nor this node's output row desc may contain the intermediate tuple, e.g.,
-  // in a single-node plan with an intermediate tuple different from the 
output tuple.
-  intermediate_row_desc_.reset(new RowDescriptor(intermediate_tuple_desc_, 
false));
-  RETURN_IF_ERROR(
-      Expr::Prepare(build_expr_ctxs_, state, *intermediate_row_desc_,
-                    expr_mem_tracker()));
-  AddExprCtxsToFree(build_expr_ctxs_);
+  RETURN_IF_ERROR(AggFnEvaluator::Create(agg_fns_, state, pool_, 
agg_fn_pool_.get(),
+      &agg_fn_evals_));
 
-  int j = grouping_expr_ctxs_.size();
-  for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++j) {
-    SlotDescriptor* intermediate_slot_desc = 
intermediate_tuple_desc_->slots()[j];
-    SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j];
-    FunctionContext* agg_fn_ctx = NULL;
-    RETURN_IF_ERROR(aggregate_evaluators_[i]->Prepare(state, 
child(0)->row_desc(),
-        intermediate_slot_desc, output_slot_desc, agg_fn_pool_.get(), 
&agg_fn_ctx));
-    agg_fn_ctxs_.push_back(agg_fn_ctx);
-    state->obj_pool()->Add(agg_fn_ctx);
-    needs_serialize_ |= aggregate_evaluators_[i]->SupportsSerialize();
-  }
-
-  if (!grouping_expr_ctxs_.empty()) {
-    RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, 
grouping_expr_ctxs_,
-        true, vector<bool>(build_expr_ctxs_.size(), true), 
state->fragment_hash_seed(),
-        MAX_PARTITION_DEPTH, 1, mem_tracker(), &ht_ctx_));
+  if (!grouping_exprs_.empty()) {
+    RETURN_IF_ERROR(HashTableCtx::Create(pool_, state, build_exprs_,
+        grouping_exprs_, true, vector<bool>(build_exprs_.size(), true),
+        state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1, expr_mem_pool(), 
&ht_ctx_));
     RETURN_IF_ERROR(state_->block_mgr()->RegisterClient(
         Substitute("PartitionedAggregationNode id=$0 ptr=$1", id_, this),
         MinRequiredBuffers(), true, mem_tracker(), state, &block_mgr_client_));
-    RETURN_IF_ERROR(CreateHashPartitions(0));
   }
 
   // TODO: Is there a need to create the stream here? If memory reservations 
work we may
@@ -296,23 +264,18 @@ void PartitionedAggregationNode::Codegen(RuntimeState* 
state) {
 Status PartitionedAggregationNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Open(state));
-
-  RETURN_IF_ERROR(Expr::Open(grouping_expr_ctxs_, state));
-  RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state));
-
-  DCHECK_EQ(aggregate_evaluators_.size(), agg_fn_ctxs_.size());
-  for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
-    RETURN_IF_ERROR(aggregate_evaluators_[i]->Open(state, agg_fn_ctxs_[i]));
-  }
-
-  if (grouping_expr_ctxs_.empty()) {
+  if (ht_ctx_.get() != nullptr) RETURN_IF_ERROR(ht_ctx_->Open(state));
+  RETURN_IF_ERROR(AggFnEvaluator::Open(agg_fn_evals_, state));
+  if (grouping_exprs_.empty()) {
     // Create the single output tuple for this non-grouping agg. This must 
happen after
     // opening the aggregate evaluators.
     singleton_output_tuple_ =
-        ConstructSingletonOutputTuple(agg_fn_ctxs_, mem_pool_.get());
+        ConstructSingletonOutputTuple(agg_fn_evals_, mem_pool_.get());
     // Check for failures during AggFnEvaluator::Init().
     RETURN_IF_ERROR(state_->GetQueryStatus());
     singleton_output_tuple_returned_ = false;
+  } else {
+    RETURN_IF_ERROR(CreateHashPartitions(0));
   }
 
   RETURN_IF_ERROR(children_[0]->Open(state));
@@ -337,7 +300,7 @@ Status PartitionedAggregationNode::Open(RuntimeState* 
state) {
 
     TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
     SCOPED_TIMER(build_timer_);
-    if (grouping_expr_ctxs_.empty()) {
+    if (grouping_exprs_.empty()) {
       if (process_batch_no_grouping_fn_ != NULL) {
         RETURN_IF_ERROR(process_batch_no_grouping_fn_(this, &batch));
       } else {
@@ -364,7 +327,7 @@ Status PartitionedAggregationNode::Open(RuntimeState* 
state) {
   // Done consuming child(0)'s input. Move all the partitions in 
hash_partitions_
   // to spilled_partitions_ or aggregated_partitions_. We'll finish the 
processing in
   // GetNext().
-  if (!grouping_expr_ctxs_.empty()) {
+  if (!grouping_exprs_.empty()) {
     RETURN_IF_ERROR(MoveHashPartitions(child(0)->rows_returned()));
   }
   return Status::OK();
@@ -385,16 +348,16 @@ Status 
PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch,
   // the agg function contexts, and will be freed on the next GetNext() call by
   // FreeLocalAllocations(). The data either needs to be copied out now or 
sent up the
   // plan and copied out by a blocking ancestor. (See IMPALA-3311)
-  for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
-    const SlotDescriptor* slot_desc = 
aggregate_evaluators_[i]->output_slot_desc();
-    DCHECK(!slot_desc->type().IsCollectionType()) << "producing collections 
NYI";
-    if (!slot_desc->type().IsVarLenStringType()) continue;
+  for (const AggFn* agg_fn : agg_fns_) {
+    const SlotDescriptor& slot_desc = agg_fn->output_slot_desc();
+    DCHECK(!slot_desc.type().IsCollectionType()) << "producing collections 
NYI";
+    if (!slot_desc.type().IsVarLenStringType()) continue;
     if (IsInSubplan()) {
       // Copy string data to the row batch's pool. This is more efficient than
       // MarkNeedsDeepCopy() in a subplan since we are likely producing many 
small
       // batches.
-      RETURN_IF_ERROR(CopyStringData(
-          slot_desc, row_batch, first_row_idx, row_batch->tuple_data_pool()));
+      RETURN_IF_ERROR(CopyStringData(slot_desc, row_batch,
+          first_row_idx, row_batch->tuple_data_pool()));
     } else {
       row_batch->MarkNeedsDeepCopy();
       break;
@@ -403,14 +366,14 @@ Status 
PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch,
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor* 
slot_desc,
+Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor& 
slot_desc,
     RowBatch* row_batch, int first_row_idx, MemPool* pool) {
-  DCHECK(slot_desc->type().IsVarLenStringType());
+  DCHECK(slot_desc.type().IsVarLenStringType());
   DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1);
   FOREACH_ROW(row_batch, first_row_idx, batch_iter) {
     Tuple* tuple = batch_iter.Get()->GetTuple(0);
     StringValue* sv = reinterpret_cast<StringValue*>(
-        tuple->GetSlot(slot_desc->tuple_offset()));
+        tuple->GetSlot(slot_desc.tuple_offset()));
     if (sv == NULL || sv->len == 0) continue;
     char* new_ptr = reinterpret_cast<char*>(pool->TryAllocate(sv->len));
     if (UNLIKELY(new_ptr == NULL)) {
@@ -436,7 +399,7 @@ Status 
PartitionedAggregationNode::GetNextInternal(RuntimeState* state,
     return Status::OK();
   }
 
-  if (grouping_expr_ctxs_.empty()) {
+  if (grouping_exprs_.empty()) {
     // There was no grouping, so evaluate the conjuncts and return the single 
result row.
     // We allow calling GetNext() after eos, so don't return this row again.
     if (!singleton_output_tuple_returned_) GetSingletonOutput(row_batch);
@@ -459,13 +422,14 @@ Status 
PartitionedAggregationNode::GetNextInternal(RuntimeState* state,
 }
 
 void PartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) {
-  DCHECK(grouping_expr_ctxs_.empty());
+  DCHECK(grouping_exprs_.empty());
   int row_idx = row_batch->AddRow();
   TupleRow* row = row_batch->GetRow(row_idx);
-  Tuple* output_tuple = GetOutputTuple(
-      agg_fn_ctxs_, singleton_output_tuple_, row_batch->tuple_data_pool());
+  Tuple* output_tuple = GetOutputTuple(agg_fn_evals_,
+      singleton_output_tuple_, row_batch->tuple_data_pool());
   row->SetTuple(0, output_tuple);
-  if (ExecNode::EvalConjuncts(&conjunct_ctxs_[0], conjunct_ctxs_.size(), row)) 
{
+  if (ExecNode::EvalConjuncts(
+          conjunct_evals_.data(), conjunct_evals_.size(), row)) {
     row_batch->CommitLastRow();
     ++num_rows_returned_;
     COUNTER_SET(rows_returned_counter_, num_rows_returned_);
@@ -511,11 +475,12 @@ Status 
PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
     int row_idx = row_batch->AddRow();
     TupleRow* row = row_batch->GetRow(row_idx);
     Tuple* intermediate_tuple = output_iterator_.GetTuple();
-    Tuple* output_tuple = GetOutputTuple(
-        output_partition_->agg_fn_ctxs, intermediate_tuple, 
row_batch->tuple_data_pool());
+    Tuple* output_tuple = GetOutputTuple(output_partition_->agg_fn_evals,
+        intermediate_tuple, row_batch->tuple_data_pool());
     output_iterator_.Next();
     row->SetTuple(0, output_tuple);
-    if (ExecNode::EvalConjuncts(&conjunct_ctxs_[0], conjunct_ctxs_.size(), 
row)) {
+    DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size());
+    if (ExecNode::EvalConjuncts(conjunct_evals_.data(), conjuncts_.size(), 
row)) {
       row_batch->CommitLastRow();
       ++num_rows_returned_;
       if (ReachedLimit() || row_batch->AtCapacity()) {
@@ -648,7 +613,7 @@ bool 
PartitionedAggregationNode::ShouldExpandPreaggHashTables() const {
 }
 
 void PartitionedAggregationNode::CleanupHashTbl(
-    const vector<FunctionContext*>& agg_fn_ctxs, HashTable::Iterator it) {
+    const vector<AggFnEvaluator*>& agg_fn_evals, HashTable::Iterator it) {
   if (!needs_finalize_ && !needs_serialize_) return;
 
   // Iterate through the remaining rows in the hash table and call 
Serialize/Finalize on
@@ -660,13 +625,13 @@ void PartitionedAggregationNode::CleanupHashTbl(
     dummy_dst = Tuple::Create(output_tuple_desc_->byte_size(), 
mem_pool_.get());
     while (!it.AtEnd()) {
       Tuple* tuple = it.GetTuple();
-      AggFnEvaluator::Finalize(aggregate_evaluators_, agg_fn_ctxs, tuple, 
dummy_dst);
+      AggFnEvaluator::Finalize(agg_fn_evals, tuple, dummy_dst);
       it.Next();
     }
   } else {
     while (!it.AtEnd()) {
       Tuple* tuple = it.GetTuple();
-      AggFnEvaluator::Serialize(aggregate_evaluators_, agg_fn_ctxs, tuple);
+      AggFnEvaluator::Serialize(agg_fn_evals, tuple);
       it.Next();
     }
   }
@@ -674,13 +639,12 @@ void PartitionedAggregationNode::CleanupHashTbl(
 
 Status PartitionedAggregationNode::Reset(RuntimeState* state) {
   DCHECK(!is_streaming_preagg_) << "Cannot reset preaggregation";
-  if (!grouping_expr_ctxs_.empty()) {
+  if (!grouping_exprs_.empty()) {
     child_eos_ = false;
     partition_eos_ = false;
     // Reset the HT and the partitions for this grouping agg.
     ht_ctx_->set_level(0);
     ClosePartitions();
-    RETURN_IF_ERROR(CreateHashPartitions(0));
   }
   return ExecNode::Reset(state);
 }
@@ -689,40 +653,34 @@ void PartitionedAggregationNode::Close(RuntimeState* 
state) {
   if (is_closed()) return;
 
   if (!singleton_output_tuple_returned_) {
-    DCHECK_EQ(agg_fn_ctxs_.size(), aggregate_evaluators_.size());
-    GetOutputTuple(agg_fn_ctxs_, singleton_output_tuple_, mem_pool_.get());
+    GetOutputTuple(agg_fn_evals_, singleton_output_tuple_, mem_pool_.get());
   }
 
   // Iterate through the remaining rows in the hash table and call 
Serialize/Finalize on
   // them in order to free any memory allocated by UDAs
   if (output_partition_ != NULL) {
-    CleanupHashTbl(output_partition_->agg_fn_ctxs, output_iterator_);
+    CleanupHashTbl(output_partition_->agg_fn_evals, output_iterator_);
     output_partition_->Close(false);
   }
 
   ClosePartitions();
 
   child_batch_.reset();
-  for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
-    aggregate_evaluators_[i]->Close(state);
-  }
-  agg_expr_ctxs_.clear();
-  for (int i = 0; i < agg_fn_ctxs_.size(); ++i) {
-    agg_fn_ctxs_[i]->impl()->Close();
-  }
-  if (agg_fn_pool_.get() != NULL) agg_fn_pool_->FreeAll();
-  if (mem_pool_.get() != NULL) mem_pool_->FreeAll();
-  if (ht_ctx_.get() != NULL) ht_ctx_->Close();
-  if (serialize_stream_.get() != NULL) {
-    serialize_stream_->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-  }
 
-  if (block_mgr_client_ != NULL) {
-    state->block_mgr()->ClearReservations(block_mgr_client_);
-  }
+  // Close all the agg-fn-evaluators
+  AggFnEvaluator::Close(agg_fn_evals_, state);
 
-  Expr::Close(grouping_expr_ctxs_, state);
-  Expr::Close(build_expr_ctxs_, state);
+  if (agg_fn_pool_.get() != nullptr) agg_fn_pool_->FreeAll();
+  if (mem_pool_.get() != nullptr) mem_pool_->FreeAll();
+  if (ht_ctx_.get() != nullptr) ht_ctx_->Close(state);
+  ht_ctx_.reset();
+  if (serialize_stream_.get() != nullptr) {
+    serialize_stream_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+  }
+  if (block_mgr_client_ != nullptr) 
state->block_mgr()->ClearReservations(block_mgr_client_);
+  ScalarExpr::Close(grouping_exprs_);
+  ScalarExpr::Close(build_exprs_);
+  AggFn::Close(agg_fns_);
   ExecNode::Close(state);
 }
 
@@ -732,16 +690,13 @@ PartitionedAggregationNode::Partition::~Partition() {
 
 Status PartitionedAggregationNode::Partition::InitStreams() {
   agg_fn_pool.reset(new MemPool(parent->expr_mem_tracker()));
-  DCHECK_EQ(agg_fn_ctxs.size(), 0);
-  for (int i = 0; i < parent->agg_fn_ctxs_.size(); ++i) {
-    
agg_fn_ctxs.push_back(parent->agg_fn_ctxs_[i]->impl()->Clone(agg_fn_pool.get()));
-    parent->partition_pool_->Add(agg_fn_ctxs[i]);
-  }
-
+  DCHECK_EQ(agg_fn_evals.size(), 0);
+  AggFnEvaluator::ShallowClone(parent->partition_pool_.get(), 
agg_fn_pool.get(),
+      parent->agg_fn_evals_, &agg_fn_evals);
   // Varlen aggregate function results are stored outside of 
aggregated_row_stream because
   // BufferedTupleStream doesn't support relocating varlen data stored in the 
stream.
   auto agg_slot = parent->intermediate_tuple_desc_->slots().begin() +
-      parent->grouping_expr_ctxs_.size();
+      parent->grouping_exprs_.size();
   set<SlotId> external_varlen_slots;
   for (; agg_slot != parent->intermediate_tuple_desc_->slots().end(); 
++agg_slot) {
     if ((*agg_slot)->type().IsVarLenStringType()) {
@@ -813,8 +768,6 @@ Status 
PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
     DCHECK(!parent->serialize_stream_->is_pinned());
     DCHECK(parent->serialize_stream_->has_write_block());
 
-    const vector<AggFnEvaluator*>& evaluators = parent->aggregate_evaluators_;
-
     // Serialize and copy the spilled partition's stream into the new stream.
     Status status = Status::OK();
     bool failed_to_add = false;
@@ -823,7 +776,7 @@ Status 
PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
     while (!it.AtEnd()) {
       Tuple* tuple = it.GetTuple();
       it.Next();
-      AggFnEvaluator::Serialize(evaluators, agg_fn_ctxs, tuple);
+      AggFnEvaluator::Serialize(agg_fn_evals, tuple);
       if (UNLIKELY(!new_stream->AddRow(reinterpret_cast<TupleRow*>(&tuple), 
&status))) {
         failed_to_add = true;
         break;
@@ -834,7 +787,7 @@ Status 
PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
     // clean up easier (someone has to finalize this stream and we don't want 
to remember
     // where we are).
     if (failed_to_add) {
-      parent->CleanupHashTbl(agg_fn_ctxs, it);
+      parent->CleanupHashTbl(agg_fn_evals, it);
       hash_tbl->Close();
       hash_tbl.reset();
       aggregated_row_stream->Close(NULL, 
RowBatch::FlushMode::NO_FLUSH_RESOURCES);
@@ -881,9 +834,8 @@ Status PartitionedAggregationNode::Partition::Spill() {
   RETURN_IF_ERROR(SerializeStreamForSpilling());
 
   // Free the in-memory result data.
-  for (int i = 0; i < agg_fn_ctxs.size(); ++i) {
-    agg_fn_ctxs[i]->impl()->Close();
-  }
+  AggFnEvaluator::Close(agg_fn_evals, parent->state_);
+  agg_fn_evals.clear();
 
   if (agg_fn_pool.get() != NULL) {
     agg_fn_pool->FreeAll();
@@ -938,7 +890,7 @@ void PartitionedAggregationNode::Partition::Close(bool 
finalize_rows) {
       // We need to walk all the rows and Finalize them here so the UDA gets a 
chance
       // to cleanup. If the hash table is gone (meaning this was spilled), the 
rows
       // should have been finalized/serialized in Spill().
-      parent->CleanupHashTbl(agg_fn_ctxs, 
hash_tbl->Begin(parent->ht_ctx_.get()));
+      parent->CleanupHashTbl(agg_fn_evals, 
hash_tbl->Begin(parent->ht_ctx_.get()));
     }
     aggregated_row_stream->Close(NULL, 
RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   }
@@ -946,23 +898,21 @@ void PartitionedAggregationNode::Partition::Close(bool 
finalize_rows) {
   if (unaggregated_row_stream.get() != NULL) {
     unaggregated_row_stream->Close(NULL, 
RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   }
-
-  for (int i = 0; i < agg_fn_ctxs.size(); ++i) {
-    agg_fn_ctxs[i]->impl()->Close();
-  }
+  for (AggFnEvaluator* eval : agg_fn_evals) eval->Close(parent->state_);
   if (agg_fn_pool.get() != NULL) agg_fn_pool->FreeAll();
 }
 
 Tuple* PartitionedAggregationNode::ConstructSingletonOutputTuple(
-    const vector<FunctionContext*>& agg_fn_ctxs, MemPool* pool) {
-  DCHECK(grouping_expr_ctxs_.empty());
+    const vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool) {
+  DCHECK(grouping_exprs_.empty());
   Tuple* output_tuple = Tuple::Create(intermediate_tuple_desc_->byte_size(), 
pool);
-  InitAggSlots(agg_fn_ctxs, output_tuple);
+  InitAggSlots(agg_fn_evals, output_tuple);
   return output_tuple;
 }
 
 Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
-    const vector<FunctionContext*>& agg_fn_ctxs, MemPool* pool, Status* 
status) noexcept {
+    const vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool,
+    Status* status) noexcept {
   const int fixed_size = intermediate_tuple_desc_->byte_size();
   const int varlen_size = GroupingExprsVarlenSize();
   const int tuple_data_size = fixed_size + varlen_size;
@@ -977,12 +927,12 @@ Tuple* 
PartitionedAggregationNode::ConstructIntermediateTuple(
   Tuple* intermediate_tuple = reinterpret_cast<Tuple*>(tuple_data);
   uint8_t* varlen_data = tuple_data + fixed_size;
   CopyGroupingValues(intermediate_tuple, varlen_data, varlen_size);
-  InitAggSlots(agg_fn_ctxs, intermediate_tuple);
+  InitAggSlots(agg_fn_evals, intermediate_tuple);
   return intermediate_tuple;
 }
 
 Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
-    const vector<FunctionContext*>& agg_fn_ctxs, BufferedTupleStream* stream,
+    const vector<AggFnEvaluator*>& agg_fn_evals, BufferedTupleStream* stream,
     Status* status) noexcept {
   DCHECK(stream != NULL && status != NULL);
   // Allocate space for the entire tuple in the stream.
@@ -1007,7 +957,7 @@ Tuple* 
PartitionedAggregationNode::ConstructIntermediateTuple(
   Tuple* intermediate_tuple = reinterpret_cast<Tuple*>(fixed_buffer);
   intermediate_tuple->Init(fixed_size);
   CopyGroupingValues(intermediate_tuple, varlen_buffer, varlen_size);
-  InitAggSlots(agg_fn_ctxs, intermediate_tuple);
+  InitAggSlots(agg_fn_evals, intermediate_tuple);
   return intermediate_tuple;
 }
 
@@ -1026,7 +976,7 @@ int PartitionedAggregationNode::GroupingExprsVarlenSize() {
 void PartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple,
     uint8_t* buffer, int varlen_size) {
   // Copy over all grouping slots (the variable length data is copied below).
-  for (int i = 0; i < grouping_expr_ctxs_.size(); ++i) {
+  for (int i = 0; i < grouping_exprs_.size(); ++i) {
     SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[i];
     if (ht_ctx_->ExprValueNull(i)) {
       intermediate_tuple->SetNull(slot_desc->null_indicator_offset());
@@ -1052,11 +1002,10 @@ void 
PartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple,
 
 // TODO: codegen this function.
 void PartitionedAggregationNode::InitAggSlots(
-    const vector<FunctionContext*>& agg_fn_ctxs, Tuple* intermediate_tuple) {
+    const vector<AggFnEvaluator*>& agg_fn_evals, Tuple* intermediate_tuple) {
   vector<SlotDescriptor*>::const_iterator slot_desc =
-      intermediate_tuple_desc_->slots().begin() + grouping_expr_ctxs_.size();
-  for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++slot_desc) {
-    AggFnEvaluator* evaluator = aggregate_evaluators_[i];
+      intermediate_tuple_desc_->slots().begin() + grouping_exprs_.size();
+  for (int i = 0; i < agg_fn_evals.size(); ++i, ++slot_desc) {
     // To minimize branching on the UpdateTuple path, initialize the result 
value so that
     // the Add() UDA function can ignore the NULL bit of its destination 
value. E.g. for
     // SUM(), if we initialize the destination value to 0 (with the NULL bit 
set), we can
@@ -1067,18 +1016,21 @@ void PartitionedAggregationNode::InitAggSlots(
     // For boolean and numeric types, the default values are false/0, so the 
nullable
     // aggregate functions SUM() and AVG() produce the correct result. For 
MIN()/MAX(),
     // initialize the value to max/min possible value for the same effect.
-    evaluator->Init(agg_fn_ctxs[i], intermediate_tuple);
-
-    AggFnEvaluator::AggregationOp agg_op = evaluator->agg_op();
-    if ((agg_op == AggFnEvaluator::MIN || agg_op == AggFnEvaluator::MAX)
-        && !evaluator->intermediate_type().IsStringType()
-        && !evaluator->intermediate_type().IsTimestampType()) {
+    AggFnEvaluator* eval = agg_fn_evals[i];
+    eval->Init(intermediate_tuple);
+
+    DCHECK(agg_fns_[i] == &(eval->agg_fn()));
+    const AggFn* agg_fn = agg_fns_[i];
+    const AggFn::AggregationOp agg_op = agg_fn->agg_op();
+    if ((agg_op == AggFn::MIN || agg_op == AggFn::MAX) &&
+        !agg_fn->intermediate_type().IsStringType() &&
+        !agg_fn->intermediate_type().IsTimestampType()) {
       ExprValue default_value;
       void* default_value_ptr = NULL;
-      if (evaluator->agg_op() == AggFnEvaluator::MIN) {
+      if (agg_op == AggFn::MIN) {
         default_value_ptr = default_value.SetToMax((*slot_desc)->type());
       } else {
-        DCHECK_EQ(evaluator->agg_op(), AggFnEvaluator::MAX);
+        DCHECK_EQ(agg_op, AggFn::MAX);
         default_value_ptr = default_value.SetToMin((*slot_desc)->type());
       }
       RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, NULL);
@@ -1086,34 +1038,34 @@ void PartitionedAggregationNode::InitAggSlots(
   }
 }
 
-void PartitionedAggregationNode::UpdateTuple(
-    FunctionContext** agg_fn_ctxs, Tuple* tuple, TupleRow* row, bool is_merge) 
noexcept {
-  DCHECK(tuple != NULL || aggregate_evaluators_.empty());
-  for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
+void PartitionedAggregationNode::UpdateTuple(AggFnEvaluator** agg_fn_evals,
+    Tuple* tuple, TupleRow* row, bool is_merge) noexcept {
+  DCHECK(tuple != NULL || agg_fns_.empty());
+  for (int i = 0; i < agg_fns_.size(); ++i) {
     if (is_merge) {
-      aggregate_evaluators_[i]->Merge(agg_fn_ctxs[i], row->GetTuple(0), tuple);
+      agg_fn_evals[i]->Merge(row->GetTuple(0), tuple);
     } else {
-      aggregate_evaluators_[i]->Add(agg_fn_ctxs[i], row, tuple);
+      agg_fn_evals[i]->Add(row, tuple);
     }
   }
 }
 
 Tuple* PartitionedAggregationNode::GetOutputTuple(
-    const vector<FunctionContext*>& agg_fn_ctxs, Tuple* tuple, MemPool* pool) {
-  DCHECK(tuple != NULL || aggregate_evaluators_.empty()) << tuple;
+    const vector<AggFnEvaluator*>& agg_fn_evals, Tuple* tuple, MemPool* pool) {
+  DCHECK(tuple != NULL || agg_fn_evals.empty()) << tuple;
   Tuple* dst = tuple;
   if (needs_finalize_ && intermediate_tuple_id_ != output_tuple_id_) {
     dst = Tuple::Create(output_tuple_desc_->byte_size(), pool);
   }
   if (needs_finalize_) {
-    AggFnEvaluator::Finalize(aggregate_evaluators_, agg_fn_ctxs, tuple, dst);
+    AggFnEvaluator::Finalize(agg_fn_evals, tuple, dst);
   } else {
-    AggFnEvaluator::Serialize(aggregate_evaluators_, agg_fn_ctxs, tuple);
+    AggFnEvaluator::Serialize(agg_fn_evals, tuple);
   }
   // Copy grouping values from tuple to dst.
   // TODO: Codegen this.
   if (dst != tuple) {
-    int num_grouping_slots = grouping_expr_ctxs_.size();
+    int num_grouping_slots = grouping_exprs_.size();
     for (int i = 0; i < num_grouping_slots; ++i) {
       SlotDescriptor* src_slot_desc = intermediate_tuple_desc_->slots()[i];
       SlotDescriptor* dst_slot_desc = output_tuple_desc_->slots()[i];
@@ -1155,8 +1107,8 @@ void PartitionedAggregationNode::DebugString(int 
indentation_level,
        << "intermediate_tuple_id=" << intermediate_tuple_id_
        << " output_tuple_id=" << output_tuple_id_
        << " needs_finalize=" << needs_finalize_
-       << " grouping_exprs=" << Expr::DebugString(grouping_expr_ctxs_)
-       << " agg_exprs=" << AggFnEvaluator::DebugString(aggregate_evaluators_);
+       << " grouping_exprs=" << ScalarExpr::DebugString(grouping_exprs_)
+       << " agg_exprs=" << AggFn::DebugString(agg_fns_);
   ExecNode::DebugString(indentation_level, out);
   *out << ")";
 }
@@ -1326,8 +1278,7 @@ Status 
PartitionedAggregationNode::ProcessStream(BufferedTupleStream* input_stre
       RETURN_IF_ERROR(input_stream->GetNext(&batch, &eos));
       RETURN_IF_ERROR(
           ProcessBatch<AGGREGATED_ROWS>(&batch, prefetch_mode, ht_ctx_.get()));
-      RETURN_IF_ERROR(state_->GetQueryStatus());
-      FreeLocalAllocations();
+      RETURN_IF_ERROR(QueryMaintenance(state_));
       batch.Reset();
     } while (!eos);
   }
@@ -1433,37 +1384,36 @@ void PartitionedAggregationNode::ClosePartitions() {
 }
 
 Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) {
-  for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
-    
ExprContext::FreeLocalAllocations(aggregate_evaluators_[i]->input_expr_ctxs());
-  }
-  ExprContext::FreeLocalAllocations(agg_fn_ctxs_);
+  AggFnEvaluator::FreeLocalAllocations(agg_fn_evals_);
   for (int i = 0; i < hash_partitions_.size(); ++i) {
-    ExprContext::FreeLocalAllocations(hash_partitions_[i]->agg_fn_ctxs);
+    AggFnEvaluator::FreeLocalAllocations(hash_partitions_[i]->agg_fn_evals);
   }
+  if (ht_ctx_.get() != nullptr) ht_ctx_->FreeLocalAllocations();
   return ExecNode::QueryMaintenance(state);
 }
 
 // IR Generation for updating a single aggregation slot. Signature is:
-// void UpdateSlot(FunctionContext* agg_fn_ctx, ExprContext* agg_expr_ctx,
-//     AggTuple* agg_tuple, char** row)
+// void UpdateSlot(AggFnEvaluator* agg_expr_eval, AggTuple* agg_tuple, char** 
row)
 //
 // The IR for sum(double_col), which is constructed directly with the 
IRBuilder, is:
 //
-// define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %agg_fn_ctx,
-//    %"class.impala::ExprContext"** %agg_expr_ctxs,
-//    { i8, [7 x i8], double }* %agg_tuple, %"class.impala::TupleRow"* %row) 
#34 {
+// define void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
+//     <{ double, i8 }>* %agg_tuple, %"class.impala::TupleRow"* %row) #33 {
 // entry:
-//   %expr_ctx_ptr = getelementptr %"class.impala::ExprContext"*,
-//      %"class.impala::ExprContext"** %agg_expr_ctxs, i32 0
-//   %expr_ctx = load %"class.impala::ExprContext"*,
-//      %"class.impala::ExprContext"** %expr_ctx_ptr
-//   %input0 = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* 
%expr_ctx,
-//      %"class.impala::TupleRow"* %row)
-//   %dst_slot_ptr = getelementptr inbounds { i8, [7 x i8], double },
-//       { i8, [7 x i8], double }* %agg_tuple, i32 0, i32 2
+//   %input_evals_vector = call %"class.impala::ScalarExprEvaluator"**
+//       @_ZNK6impala14AggFnEvaluator11input_evalsEv(
+//           %"class.impala::AggFnEvaluator"* %agg_fn_eval)
+//   %0 = getelementptr %"class.impala::ScalarExprEvaluator"*,
+//       %"class.impala::ScalarExprEvaluator"** %input_evals_vector, i32 0
+//   %input_eval = load %"class.impala::ScalarExprEvaluator"*,
+//       %"class.impala::ScalarExprEvaluator"** %0
+//   %input0 = call { i8, double } 
@GetSlotRef(%"class.impala::ScalarExprEvaluator"*
+//       %input_eval, %"class.impala::TupleRow"* %row)
+//   %dst_slot_ptr = getelementptr inbounds <{ double, i8 }>,
+//       <{ double, i8 }>* %agg_tuple, i32 0, i32 0
 //   %dst_val = load double, double* %dst_slot_ptr
-//   %0 = extractvalue { i8, double } %input0, 0
-//   %is_null = trunc i8 %0 to i1
+//   %1 = extractvalue { i8, double } %input0, 0
+//   %is_null = trunc i8 %1 to i1
 //   br i1 %is_null, label %ret, label %not_null
 //
 // ret:                                              ; preds = %not_null, 
%entry
@@ -1471,83 +1421,68 @@ Status 
PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) {
 //
 // not_null:                                         ; preds = %entry
 //   %val = extractvalue { i8, double } %input0, 1
-//   %1 = fadd double %dst_val, %val
-//   %2 = bitcast { i8, [7 x i8], double }* %agg_tuple to i8*
-//   %null_byte_ptr = getelementptr i8, i8* %2, i32 0
+//   %2 = fadd double %dst_val, %val
+//   %3 = bitcast <{ double, i8 }>* %agg_tuple to i8*
+//   %null_byte_ptr = getelementptr inbounds i8, i8* %3, i32 8
 //   %null_byte = load i8, i8* %null_byte_ptr
 //   %null_bit_cleared = and i8 %null_byte, -2
 //   store i8 %null_bit_cleared, i8* %null_byte_ptr
-//   store double %1, double* %dst_slot_ptr
+//   store double %2, double* %dst_slot_ptr
 //   br label %ret
 // }
 //
-// The IR for min(timestamp_col), which uses the UDA interface, is:
+// The IR for ndv(timestamp_col), which uses the UDA interface, is:
 //
-// define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %agg_fn_ctx,
-//      %"class.impala::ExprContext"** %agg_expr_ctxs,
-//      { i8, [7 x i8], %"class.impala::TimestampValue" }* %agg_tuple,
-//      %"class.impala::TupleRow"* %row) #34 {
+// define void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
+//     <{ %"struct.impala::StringValue" }>* %agg_tuple,
+//     %"class.impala::TupleRow"* %row) #33 {
 // entry:
-//   %dst_lowered_ptr = alloca { i64, i64 }
-//   %input_lowered_ptr = alloca { i64, i64 }
-//   %expr_ctx_ptr = getelementptr %"class.impala::ExprContext"*,
-//        %"class.impala::ExprContext"** %agg_expr_ctxs, i32 0
-//   %expr_ctx = load %"class.impala::ExprContext"*,
-//        %"class.impala::ExprContext"** %expr_ctx_ptr
-//   %input0 = call { i64, i64 } @GetSlotRef(%"class.impala::ExprContext"* 
%expr_ctx,
-//        %"class.impala::TupleRow"* %row)
-//   %dst_slot_ptr = getelementptr inbounds { i8, [7 x i8],
-//        %"class.impala::TimestampValue" }, { i8, [7 x i8],
-//        %"class.impala::TimestampValue" }* %agg_tuple, i32 0, i32 2
-//   %dst_val = load %"class.impala::TimestampValue",
-//        %"class.impala::TimestampValue"* %dst_slot_ptr
-//   %0 = bitcast { i8, [7 x i8], %"class.impala::TimestampValue" }* 
%agg_tuple to i8*
-//   %null_byte_ptr = getelementptr i8, i8* %0, i32 0
-//   %null_byte = load i8, i8* %null_byte_ptr
-//   %null_mask = and i8 %null_byte, 1
-//   %is_null = icmp ne i8 %null_mask, 0
-//   %is_null_ext = zext i1 %is_null to i64
-//   %1 = or i64 0, %is_null_ext
-//   %dst = insertvalue { i64, i64 } zeroinitializer, i64 %1, 0
-//   %time_of_day = extractvalue %"class.impala::TimestampValue" %dst_val, 0, 
0, 0, 0
-//   %dst1 = insertvalue { i64, i64 } %dst, i64 %time_of_day, 1
-//   %date = extractvalue %"class.impala::TimestampValue" %dst_val, 1, 0, 0
-//   %2 = extractvalue { i64, i64 } %dst1, 0
-//   %3 = zext i32 %date to i64
+//   %dst_lowered_ptr = alloca { i64, i8* }
+//   %0 = alloca { i64, i64 }
+//   %input_evals_vector = call %"class.impala::ScalarExprEvaluator"**
+//       @_ZNK6impala14AggFnEvaluator11input_evalsEv(
+//           %"class.impala::AggFnEvaluator"* %agg_fn_eval)
+//   %1 = getelementptr %"class.impala::ScalarExprEvaluator"*,
+//       %"class.impala::ScalarExprEvaluator"** %input_evals_vector, i32 0
+//   %input_eval = load %"class.impala::ScalarExprEvaluator"*,
+//       %"class.impala::ScalarExprEvaluator"** %1
+//   %input0 = call { i64, i64 } @GetSlotRef(
+//       %"class.impala::ScalarExprEvaluator"* %input_eval,
+//       %"class.impala::TupleRow"* %row)
+//   %dst_slot_ptr = getelementptr inbounds <{ %"struct.impala::StringValue" 
}>,
+//       <{ %"struct.impala::StringValue" }>* %agg_tuple, i32 0, i32 0
+//   %dst_val =
+//       load %"struct.impala::StringValue", %"struct.impala::StringValue"* 
%dst_slot_ptr
+//   %ptr = extractvalue %"struct.impala::StringValue" %dst_val, 0
+//   %dst = insertvalue { i64, i8* } zeroinitializer, i8* %ptr, 1
+//   %len = extractvalue %"struct.impala::StringValue" %dst_val, 1
+//   %2 = extractvalue { i64, i8* } %dst, 0
+//   %3 = zext i32 %len to i64
 //   %4 = shl i64 %3, 32
 //   %5 = and i64 %2, 4294967295
 //   %6 = or i64 %5, %4
-//   %dst2 = insertvalue { i64, i64 } %dst1, i64 %6, 0
-//   store { i64, i64 } %input0, { i64, i64 }* %input_lowered_ptr
-//   %input_unlowered_ptr = bitcast { i64, i64 }* %input_lowered_ptr
-//        to %"struct.impala_udf::TimestampVal"*
-//   store { i64, i64 } %dst2, { i64, i64 }* %dst_lowered_ptr
-//   %dst_unlowered_ptr = bitcast { i64, i64 }* %dst_lowered_ptr
-//        to %"struct.impala_udf::TimestampVal"*
-//   call void
-//        
@_ZN6impala18AggregateFunctions3MinIN10impala_udf12TimestampValEEEvPNS2_15FunctionContextERKT_PS6_.2(
-//        %"class.impala_udf::FunctionContext"* %agg_fn_ctx,
-//        %"struct.impala_udf::TimestampVal"* %input_unlowered_ptr,
-//        %"struct.impala_udf::TimestampVal"* %dst_unlowered_ptr)
-//   %anyval_result = load { i64, i64 }, { i64, i64 }* %dst_lowered_ptr
-//   %7 = extractvalue { i64, i64 } %anyval_result, 1
-//   %8 = insertvalue %"class.impala::TimestampValue" zeroinitializer, i64 %7, 
0, 0, 0, 0
-//   %9 = extractvalue { i64, i64 } %anyval_result, 0
-//   %10 = ashr i64 %9, 32
-//   %11 = trunc i64 %10 to i32
-//   %12 = insertvalue %"class.impala::TimestampValue" %8, i32 %11, 1, 0, 0
-//   %13 = extractvalue { i64, i64 } %anyval_result, 0
-//   %result_is_null = trunc i64 %13 to i1
-//   %14 = bitcast { i8, [7 x i8], %"class.impala::TimestampValue" }* 
%agg_tuple to i8*
-//   %null_byte_ptr3 = getelementptr i8, i8* %14, i32 0
-//   %null_byte4 = load i8, i8* %null_byte_ptr3
-//   %null_bit_cleared = and i8 %null_byte4, -2
-//   %15 = sext i1 %result_is_null to i8
-//   %null_bit = and i8 %15, 1
-//   %null_bit_set = or i8 %null_bit_cleared, %null_bit
-//   store i8 %null_bit_set, i8* %null_byte_ptr3
-//   store %"class.impala::TimestampValue" %12,
-//        %"class.impala::TimestampValue"* %dst_slot_ptr
+//   %dst1 = insertvalue { i64, i8* } %dst, i64 %6, 0
+//   %agg_fn_ctx = call %"class.impala_udf::FunctionContext"*
+//       @_ZNK6impala14AggFnEvaluator10agg_fn_ctxEv(
+//           %"class.impala::AggFnEvaluator"* %agg_fn_eval)
+//   store { i64, i64 } %input0, { i64, i64 }* %0
+//   %input_unlowered_ptr =
+//       bitcast { i64, i64 }* %0 to %"struct.impala_udf::TimestampVal"*
+//   store { i64, i8* } %dst1, { i64, i8* }* %dst_lowered_ptr
+//   %dst_unlowered_ptr =
+//       bitcast { i64, i8* }* %dst_lowered_ptr to 
%"struct.impala_udf::StringVal"*
+//   call void @"void 
impala::AggregateFunctions::HllUpdate<impala_udf::TimestampVal>"(
+//       %"class.impala_udf::FunctionContext"* %agg_fn_ctx,
+//       %"struct.impala_udf::TimestampVal"* %input_unlowered_ptr,
+//       %"struct.impala_udf::StringVal"* %dst_unlowered_ptr)
+//   %anyval_result = load { i64, i8* }, { i64, i8* }* %dst_lowered_ptr
+//   %7 = extractvalue { i64, i8* } %anyval_result, 0
+//   %8 = ashr i64 %7, 32
+//   %9 = trunc i64 %8 to i32
+//   %10 = insertvalue %"struct.impala::StringValue" zeroinitializer, i32 %9, 1
+//   %11 = extractvalue { i64, i8* } %anyval_result, 1
+//   %12 = insertvalue %"struct.impala::StringValue" %10, i8* %11, 0
+//   store %"struct.impala::StringValue" %12, %"struct.impala::StringValue"* 
%dst_slot_ptr
 //   br label %ret
 //
 // ret:                                              ; preds = %entry
@@ -1555,12 +1490,9 @@ Status 
PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) {
 // }
 //
 Status PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen,
-    AggFnEvaluator* evaluator, int evaluator_idx, SlotDescriptor* slot_desc,
-    Function** fn) {
-  PointerType* fn_ctx_type =
-      codegen->GetPtrType(FunctionContextImpl::LLVM_FUNCTIONCONTEXT_NAME);
-  PointerType* expr_ctxs_type =
-      codegen->GetPtrPtrType(codegen->GetType(ExprContext::LLVM_CLASS_NAME));
+    int agg_fn_idx, SlotDescriptor* slot_desc, Function** fn) {
+  PointerType* agg_fn_eval_type =
+      codegen->GetPtrType(AggFnEvaluator::LLVM_CLASS_NAME);
   StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
   if (tuple_struct == NULL) {
     return Status("PartitionedAggregationNode::CodegenUpdateSlot(): failed to 
generate "
@@ -1569,42 +1501,46 @@ Status 
PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen,
   PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_struct);
   PointerType* tuple_row_ptr_type = 
codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
 
-  // Create UpdateSlot prototype
   LlvmCodeGen::FnPrototype prototype(codegen, "UpdateSlot", 
codegen->void_type());
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_ctx", fn_ctx_type));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_expr_ctxs", 
expr_ctxs_type));
+  prototype.AddArgument(
+      LlvmCodeGen::NamedVariable("agg_fn_eval", agg_fn_eval_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", 
tuple_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
 
   LlvmBuilder builder(codegen->context());
-  Value* args[4];
+  Value* args[3];
   *fn = prototype.GeneratePrototype(&builder, &args[0]);
-  Value* agg_fn_ctx_arg = args[0];
-  Value* agg_expr_ctxs_arg = args[1];
-  Value* agg_tuple_arg = args[2];
-  Value* row_arg = args[3];
-
-  DCHECK_GE(evaluator->input_expr_ctxs().size(), 1);
+  Value* agg_fn_eval_arg = args[0];
+  Value* agg_tuple_arg = args[1];
+  Value* row_arg = args[2];
+
+  // Get the vector of input expressions' evaluators.
+  Value* input_evals_vector = codegen->CodegenCallFunction(&builder,
+      IRFunction::AGG_FN_EVALUATOR_INPUT_EVALUATORS, agg_fn_eval_arg,
+      "input_evals_vector");
+
+  AggFn* agg_fn = agg_fns_[agg_fn_idx];
+  const int num_inputs = agg_fn->GetNumChildren();
+  DCHECK_GE(num_inputs, 1);
   vector<CodegenAnyVal> input_vals;
-  for (int i = 0; i < evaluator->input_expr_ctxs().size(); ++i) {
-    ExprContext* agg_expr_ctx = evaluator->input_expr_ctxs()[i];
-    Expr* agg_expr = agg_expr_ctx->root();
-    Function* agg_expr_fn;
-    RETURN_IF_ERROR(agg_expr->GetCodegendComputeFn(codegen, &agg_expr_fn));
-    DCHECK(agg_expr_fn != NULL);
-
-    // Call expr function with the matching expr context to get src slot value.
-    Value* expr_ctx_ptr = builder.CreateInBoundsGEP(
-        agg_expr_ctxs_arg, codegen->GetIntConstant(TYPE_INT, i), 
"expr_ctx_ptr");
-    Value* expr_ctx = builder.CreateLoad(expr_ctx_ptr, "expr_ctx");
+  for (int i = 0; i < num_inputs; ++i) {
+    ScalarExpr* input_expr = agg_fn->GetChild(i);
+    Function* input_expr_fn;
+    RETURN_IF_ERROR(input_expr->GetCodegendComputeFn(codegen, &input_expr_fn));
+    DCHECK(input_expr_fn != NULL);
+
+    // Call input expr function with the matching evaluator to get src slot 
value.
+    Value* input_eval =
+        codegen->CodegenArrayAt(&builder, input_evals_vector, i, "input_eval");
     string input_name = Substitute("input$0", i);
-    input_vals.push_back(
-        CodegenAnyVal::CreateCallWrapped(codegen, &builder, agg_expr->type(), 
agg_expr_fn,
-            ArrayRef<Value*>({expr_ctx, row_arg}), input_name.c_str()));
+    CodegenAnyVal input_val = CodegenAnyVal::CreateCallWrapped(codegen, 
&builder,
+        input_expr->type(), input_expr_fn, ArrayRef<Value*>({input_eval, 
row_arg}),
+        input_name.c_str());
+    input_vals.push_back(input_val);
   }
 
-  AggFnEvaluator::AggregationOp agg_op = evaluator->agg_op();
-  const ColumnType& dst_type = evaluator->intermediate_type();
+  AggFn::AggregationOp agg_op = agg_fn->agg_op();
+  const ColumnType& dst_type = agg_fn->intermediate_type();
   bool dst_is_int_or_float_or_bool = dst_type.IsIntegerType()
       || dst_type.IsFloatingPointType() || dst_type.IsBooleanType();
   bool dst_is_numeric_or_bool = dst_is_int_or_float_or_bool || 
dst_type.IsDecimalType();
@@ -1615,23 +1551,24 @@ Status 
PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen,
   // for special cases where we can emit a very simple instruction sequence, 
then fall
   // back to the general-purpose approach of calling the cross-compiled 
builtin UDA.
   CodegenAnyVal& src = input_vals[0];
+
   // 'dst_slot_ptr' points to the slot in the aggregate tuple to update.
   Value* dst_slot_ptr = builder.CreateStructGEP(
       NULL, agg_tuple_arg, slot_desc->llvm_field_idx(), "dst_slot_ptr");
   Value* result = NULL;
   Value* dst_value = builder.CreateLoad(dst_slot_ptr, "dst_val");
-  if (agg_op == AggFnEvaluator::COUNT) {
+  // TODO: consider moving the following codegen logic to AggFn.
+  if (agg_op == AggFn::COUNT) {
     src.CodegenBranchIfNull(&builder, ret_block);
-    if (evaluator->is_merge()) {
+    if (agg_fn->is_merge()) {
       result = builder.CreateAdd(dst_value, src.GetVal(), "count_sum");
     } else {
       result = builder.CreateAdd(
           dst_value, codegen->GetIntConstant(TYPE_BIGINT, 1), "count_inc");
     }
     DCHECK(!slot_desc->is_nullable());
-  } else if ((agg_op == AggFnEvaluator::MIN || agg_op == AggFnEvaluator::MAX)
-      && dst_is_numeric_or_bool) {
-    bool is_min = agg_op == AggFnEvaluator::MIN;
+  } else if ((agg_op == AggFn::MIN || agg_op == AggFn::MAX) && 
dst_is_numeric_or_bool) {
+    bool is_min = agg_op == AggFn::MIN;
     src.CodegenBranchIfNull(&builder, ret_block);
     Function* min_max_fn = codegen->CodegenMinMax(slot_desc->type(), is_min);
     Value* min_max_args[] = {dst_value, src.GetVal()};
@@ -1641,7 +1578,7 @@ Status 
PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen,
     DCHECK(slot_desc->is_nullable());
     slot_desc->CodegenSetNullIndicator(
         codegen, &builder, agg_tuple_arg, codegen->false_value());
-  } else if (agg_op == AggFnEvaluator::SUM && dst_is_int_or_float_or_bool) {
+  } else if (agg_op == AggFn::SUM && dst_is_int_or_float_or_bool) {
     src.CodegenBranchIfNull(&builder, ret_block);
     if (dst_type.IsFloatingPointType()) {
       result = builder.CreateFAdd(dst_value, src.GetVal());
@@ -1662,11 +1599,11 @@ Status 
PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen,
     // value of the slot was initialized in the right way in InitAggSlots() 
(e.g. 0 for
     // SUM) that we get the right result if UpdateSlot() pretends that the 
NULL bit of
     // 'dst' is unset. Empirically this optimisation makes TPC-H Q1 5-10% 
faster.
-    bool special_null_handling = !evaluator->intermediate_type().IsStringType()
-        && !evaluator->intermediate_type().IsTimestampType()
-        && (agg_op == AggFnEvaluator::MIN || agg_op == AggFnEvaluator::MAX
-               || agg_op == AggFnEvaluator::SUM || agg_op == 
AggFnEvaluator::AVG
-               || agg_op == AggFnEvaluator::NDV);
+    bool special_null_handling = !agg_fn->intermediate_type().IsStringType()
+        && !agg_fn->intermediate_type().IsTimestampType()
+        && (agg_op == AggFn::MIN || agg_op == AggFn::MAX
+               || agg_op == AggFn::SUM || agg_op == AggFn::AVG
+               || agg_op == AggFn::NDV);
     if (slot_desc->is_nullable()) {
       if (special_null_handling) {
         src.CodegenBranchIfNull(&builder, ret_block);
@@ -1678,10 +1615,17 @@ Status 
PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen,
     }
     dst.SetFromRawValue(dst_value);
 
+    // Get the FunctionContext object for the AggFnEvaluator.
+    Function* get_agg_fn_ctx_fn =
+        codegen->GetFunction(IRFunction::AGG_FN_EVALUATOR_AGG_FN_CTX, false);
+    DCHECK(get_agg_fn_ctx_fn != NULL);
+    Value* agg_fn_ctx_val =
+        builder.CreateCall(get_agg_fn_ctx_fn, {agg_fn_eval_arg}, "agg_fn_ctx");
+
     // Call the UDA to update/merge 'src' into 'dst', with the result stored in
     // 'updated_dst_val'.
     CodegenAnyVal updated_dst_val;
-    RETURN_IF_ERROR(CodegenCallUda(codegen, &builder, evaluator, 
agg_fn_ctx_arg,
+    RETURN_IF_ERROR(CodegenCallUda(codegen, &builder, agg_fn, agg_fn_ctx_val,
         input_vals, dst, &updated_dst_val));
     result = updated_dst_val.ToNativeValue();
 
@@ -1704,7 +1648,7 @@ Status 
PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen,
   // memory/CPU usage scales super-linearly with function size.
   // E.g. compute stats on all columns of a 1000-column table previously took 
4 minutes to
   // codegen because all the UpdateSlot() functions were inlined.
-  if (evaluator_idx >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) {
+  if (agg_fn_idx >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) {
     codegen->SetNoInline(*fn);
   }
 
@@ -1717,29 +1661,29 @@ Status 
PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen,
 }
 
 Status PartitionedAggregationNode::CodegenCallUda(LlvmCodeGen* codegen,
-    LlvmBuilder* builder, AggFnEvaluator* evaluator, Value* agg_fn_ctx_arg,
-    const vector<CodegenAnyVal>& input_vals, const CodegenAnyVal& dst,
+    LlvmBuilder* builder, AggFn* agg_fn, Value* agg_fn_ctx_val,
+    const vector<CodegenAnyVal>& input_vals, const CodegenAnyVal& dst_val,
     CodegenAnyVal* updated_dst_val) {
-  DCHECK_EQ(evaluator->input_expr_ctxs().size(), input_vals.size());
   Function* uda_fn;
-  RETURN_IF_ERROR(evaluator->GetUpdateOrMergeFunction(codegen, &uda_fn));
+  RETURN_IF_ERROR(agg_fn->CodegenUpdateOrMergeFunction(codegen, &uda_fn));
 
   // Set up arguments for call to UDA, which are the FunctionContext*, 
followed by
   // pointers to all input values, followed by a pointer to the destination 
value.
   vector<Value*> uda_fn_args;
-  uda_fn_args.push_back(agg_fn_ctx_arg);
+  uda_fn_args.push_back(agg_fn_ctx_val);
 
   // Create pointers to input args to pass to uda_fn. We must use the 
unlowered type,
   // e.g. IntVal, because the UDA interface expects the values to be passed as 
const
   // references to the classes.
-  for (int i = 0; i < evaluator->input_expr_ctxs().size(); ++i) {
+  DCHECK_EQ(agg_fn->GetNumChildren(), input_vals.size());
+  for (int i = 0; i < input_vals.size(); ++i) {
     
uda_fn_args.push_back(input_vals[i].GetUnloweredPtr("input_unlowered_ptr"));
   }
 
   // Create pointer to dst to pass to uda_fn. We must use the unlowered type 
for the
   // same reason as above.
-  Value* dst_lowered_ptr = dst.GetLoweredPtr("dst_lowered_ptr");
-  const ColumnType& dst_type = evaluator->intermediate_type();
+  Value* dst_lowered_ptr = dst_val.GetLoweredPtr("dst_lowered_ptr");
+  const ColumnType& dst_type = agg_fn->intermediate_type();
   Type* dst_unlowered_ptr_type = CodegenAnyVal::GetUnloweredPtrType(codegen, 
dst_type);
   Value* dst_unlowered_ptr = builder->CreateBitCast(
       dst_lowered_ptr, dst_unlowered_ptr_type, "dst_unlowered_ptr");
@@ -1761,41 +1705,31 @@ Status 
PartitionedAggregationNode::CodegenCallUda(LlvmCodeGen* codegen,
 // For the query:
 // select count(*), count(int_col), sum(double_col) the IR looks like:
 //
-// ; Function Attrs: alwaysinline
 // define void @UpdateTuple(%"class.impala::PartitionedAggregationNode"* 
%this_ptr,
-//      %"class.impala_udf::FunctionContext"** %agg_fn_ctxs, 
%"class.impala::Tuple"*
-//      %tuple,
-//      %"class.impala::TupleRow"* %row, i1 %is_merge) #34 {
+//     %"class.impala::AggFnEvaluator"** %agg_fn_evals, 
%"class.impala::Tuple"* %tuple,
+//     %"class.impala::TupleRow"* %row, i1 %is_merge) #33 {
 // entry:
-//   %tuple1 =
-//      bitcast %"class.impala::Tuple"* %tuple to { i8, [7 x i8], i64, i64, 
double }*
-//   %src_slot = getelementptr inbounds { i8, [7 x i8], i64, i64, double },
-//      { i8, [7 x i8], i64, i64, double }* %tuple1, i32 0, i32 2
+//   %tuple1 = bitcast %"class.impala::Tuple"* %tuple to <{ i64, i64, double, 
i8 }>*
+//   %src_slot = getelementptr inbounds <{ i64, i64, double, i8 }>,
+//       <{ i64, i64, double, i8 }>* %tuple1, i32 0, i32 0
 //   %count_star_val = load i64, i64* %src_slot
 //   %count_star_inc = add i64 %count_star_val, 1
 //   store i64 %count_star_inc, i64* %src_slot
-//   %0 = getelementptr %"class.impala_udf::FunctionContext"*,
-//      %"class.impala_udf::FunctionContext"** %agg_fn_ctxs, i32 1
-//   %agg_fn_ctx = load %"class.impala_udf::FunctionContext"*,
-//      %"class.impala_udf::FunctionContext"** %0
-//   %1 = call %"class.impala::ExprContext"**
-//      @_ZNK6impala26PartitionedAggregationNode18GetAggExprContextsEi(
-//      %"class.impala::PartitionedAggregationNode"* %this_ptr, i32 1)
-//   call void @UpdateSlot(%"class.impala_udf::FunctionContext"* %agg_fn_ctx,
-//      %"class.impala::ExprContext"** %1, { i8, [7 x i8], i64, i64, double }* 
%tuple1,
-//      %"class.impala::TupleRow"* %row)
-//   %2 = getelementptr %"class.impala_udf::FunctionContext"*,
-//      %"class.impala_udf::FunctionContext"** %agg_fn_ctxs, i32 2
-//   %agg_fn_ctx2 = load %"class.impala_udf::FunctionContext"*,
-//      %"class.impala_udf::FunctionContext"** %2
-//   %3 = call %"class.impala::ExprContext"**
-//      @_ZNK6impala26PartitionedAggregationNode18GetAggExprContextsEi(
-//      %"class.impala::PartitionedAggregationNode"* %this_ptr, i32 2)
-//   call void @UpdateSlot.4(%"class.impala_udf::FunctionContext"* 
%agg_fn_ctx2,
-//      %"class.impala::ExprContext"** %3, { i8, [7 x i8], i64, i64, double }* 
%tuple1,
-//      %"class.impala::TupleRow"* %row)
+//   %0 = getelementptr %"class.impala::AggFnEvaluator"*,
+//       %"class.impala::AggFnEvaluator"** %agg_fn_evals, i32 1
+//   %agg_fn_eval =
+//       load %"class.impala::AggFnEvaluator"*, 
%"class.impala::AggFnEvaluator"** %0
+//   call void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
+//       <{ i64, i64, double, i8 }>* %tuple1, %"class.impala::TupleRow"* %row)
+//   %1 = getelementptr %"class.impala::AggFnEvaluator"*,
+//       %"class.impala::AggFnEvaluator"** %agg_fn_evals, i32 2
+//   %agg_fn_eval2 =
+//       load %"class.impala::AggFnEvaluator"*, 
%"class.impala::AggFnEvaluator"** %1
+//   call void @UpdateSlot.2(%"class.impala::AggFnEvaluator"* %agg_fn_eval2,
+//       <{ i64, i64, double, i8 }>* %tuple1, %"class.impala::TupleRow"* %row)
 //   ret void
 // }
+//
 Status PartitionedAggregationNode::CodegenUpdateTuple(
     LlvmCodeGen* codegen, Function** fn) {
   SCOPED_TIMER(codegen->codegen_timer());
@@ -1814,12 +1748,11 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(
 
   // Get the types to match the UpdateTuple signature
   Type* agg_node_type = 
codegen->GetType(PartitionedAggregationNode::LLVM_CLASS_NAME);
-  Type* fn_ctx_type = 
codegen->GetType(FunctionContextImpl::LLVM_FUNCTIONCONTEXT_NAME);
   Type* tuple_type = codegen->GetType(Tuple::LLVM_CLASS_NAME);
   Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
 
   PointerType* agg_node_ptr_type = codegen->GetPtrType(agg_node_type);
-  PointerType* fn_ctx_ptr_ptr_type = codegen->GetPtrPtrType(fn_ctx_type);
+  PointerType* evals_type = 
codegen->GetPtrPtrType(AggFnEvaluator::LLVM_CLASS_NAME);
   PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_type);
   PointerType* tuple_row_ptr_type = codegen->GetPtrType(tuple_row_type);
 
@@ -1827,7 +1760,7 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(
   PointerType* tuple_ptr = codegen->GetPtrType(tuple_struct);
   LlvmCodeGen::FnPrototype prototype(codegen, "UpdateTuple", 
codegen->void_type());
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", 
agg_node_ptr_type));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_ctxs", 
fn_ctx_ptr_ptr_type));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_evals", 
evals_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", tuple_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("is_merge", 
codegen->boolean_type()));
@@ -1835,8 +1768,7 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(
   LlvmBuilder builder(codegen->context());
   Value* args[5];
   *fn = prototype.GeneratePrototype(&builder, &args[0]);
-  Value* this_arg = args[0];
-  Value* agg_fn_ctxs_arg = args[1];
+  Value* agg_fn_evals_arg = args[1];
   Value* tuple_arg = args[2];
   Value* row_arg = args[3];
 
@@ -1844,17 +1776,13 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(
   // TODO: get rid of this by using right type in function signature
   tuple_arg = builder.CreateBitCast(tuple_arg, tuple_ptr, "tuple");
 
-  Function* get_expr_ctxs_fn =
-      codegen->GetFunction(IRFunction::PART_AGG_NODE_GET_EXPR_CTXS, false);
-  DCHECK(get_expr_ctxs_fn != NULL);
-
   // Loop over each expr and generate the IR for that slot.  If the expr is not
   // count(*), generate a helper IR function to update the slot and call that.
-  int j = grouping_expr_ctxs_.size();
-  for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++j) {
+  int j = grouping_exprs_.size();
+  for (int i = 0; i < agg_fns_.size(); ++i, ++j) {
     SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j];
-    AggFnEvaluator* evaluator = aggregate_evaluators_[i];
-    if (evaluator->is_count_star()) {
+    AggFn* agg_fn = agg_fns_[i];
+    if (agg_fn->is_count_star()) {
       // TODO: we should be able to hoist this up to the loop over the batch 
and just
       // increment the slot by the number of rows in the batch.
       int field_idx = slot_desc->llvm_field_idx();
@@ -1865,15 +1793,14 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(
       builder.CreateStore(count_inc, slot_ptr);
     } else {
       Function* update_slot_fn;
-      RETURN_IF_ERROR(
-          CodegenUpdateSlot(codegen, evaluator, i, slot_desc, 
&update_slot_fn));
-      Value* agg_fn_ctx_ptr = builder.CreateConstGEP1_32(agg_fn_ctxs_arg, i);
-      Value* agg_fn_ctx = builder.CreateLoad(agg_fn_ctx_ptr, "agg_fn_ctx");
-      // Call GetExprCtx() to get the expression context.
-      DCHECK(agg_expr_ctxs_[i] != NULL);
-      Value* get_expr_ctxs_args[] = {this_arg, 
codegen->GetIntConstant(TYPE_INT, i)};
-      Value* agg_expr_ctxs = builder.CreateCall(get_expr_ctxs_fn, 
get_expr_ctxs_args);
-      Value* update_slot_args[] = {agg_fn_ctx, agg_expr_ctxs, tuple_arg, 
row_arg};
+      RETURN_IF_ERROR(CodegenUpdateSlot(codegen, i, slot_desc, 
&update_slot_fn));
+
+      // Load agg_fn_evals_[i]
+      Value* agg_fn_eval_val =
+          codegen->CodegenArrayAt(&builder, agg_fn_evals_arg, i, 
"agg_fn_eval");
+
+      // Call UpdateSlot(agg_fn_evals_[i], tuple, row);
+      Value* update_slot_args[] = {agg_fn_eval_val, tuple_arg, row_arg};
       builder.CreateCall(update_slot_fn, update_slot_args);
     }
   }
@@ -1881,7 +1808,7 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(
 
   // Avoid inlining big UpdateTuple function into outer loop - we're unlikely 
to get
   // any benefit from it since the function call overhead will be amortized.
-  if (aggregate_evaluators_.size() > 
LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
+  if (agg_fns_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
     codegen->SetNoInline(*fn);
   }
 
@@ -1902,14 +1829,14 @@ Status 
PartitionedAggregationNode::CodegenProcessBatch(LlvmCodeGen* codegen,
   RETURN_IF_ERROR(CodegenUpdateTuple(codegen, &update_tuple_fn));
 
   // Get the cross compiled update row batch function
-  IRFunction::Type ir_fn = (!grouping_expr_ctxs_.empty() ?
+  IRFunction::Type ir_fn = (!grouping_exprs_.empty() ?
       IRFunction::PART_AGG_NODE_PROCESS_BATCH_UNAGGREGATED :
       IRFunction::PART_AGG_NODE_PROCESS_BATCH_NO_GROUPING);
   Function* process_batch_fn = codegen->GetFunction(ir_fn, true);
   DCHECK(process_batch_fn != NULL);
 
   int replaced;
-  if (!grouping_expr_ctxs_.empty()) {
+  if (!grouping_exprs_.empty()) {
     // Codegen for grouping using hash table
 
     // Replace prefetch_mode with constant so branches can be optimised out.
@@ -1960,7 +1887,7 @@ Status 
PartitionedAggregationNode::CodegenProcessBatch(LlvmCodeGen* codegen,
         "ProcessBatch() function failed verification, see log");
   }
 
-  void **codegened_fn_ptr = grouping_expr_ctxs_.empty() ?
+  void **codegened_fn_ptr = grouping_exprs_.empty() ?
       reinterpret_cast<void**>(&process_batch_no_grouping_fn_) :
       reinterpret_cast<void**>(&process_batch_fn_);
   codegen->AddFunctionToJit(process_batch_fn, codegened_fn_ptr);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h 
b/be/src/exec/partitioned-aggregation-node.h
index 2155473..ccac45b 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -37,6 +37,7 @@ class Value;
 
 namespace impala {
 
+class AggFn;
 class AggFnEvaluator;
 class CodegenAnyVal;
 class LlvmCodeGen;
@@ -139,7 +140,7 @@ class PartitionedAggregationNode : public ExecNode {
   static const char* LLVM_CLASS_NAME;
 
  protected:
-  /// Frees local allocations from aggregate_evaluators_ and agg_fn_ctxs
+  /// Frees local allocations from aggregate_evals_ and agg_fn_evals
   virtual Status QueryMaintenance(RuntimeState* state);
   virtual void DebugString(int indentation_level, std::stringstream* out) 
const;
 
@@ -180,7 +181,11 @@ class PartitionedAggregationNode : public ExecNode {
   TupleDescriptor* intermediate_tuple_desc_;
 
   /// Row with the intermediate tuple as its only tuple.
-  boost::scoped_ptr<RowDescriptor> intermediate_row_desc_;
+  /// Construct a new row desc for preparing the build exprs because neither 
the child's
+  /// nor this node's output row desc may contain the intermediate tuple, e.g.,
+  /// in a single-node plan with an intermediate tuple different from the 
output tuple.
+  /// Lives in the query state's obj_pool.
+  RowDescriptor* intermediate_row_desc_;
 
   /// Tuple into which Finalize() results are stored. Possibly the same as
   /// the intermediate tuple.
@@ -195,39 +200,32 @@ class PartitionedAggregationNode : public ExecNode {
 
   /// True if this is first phase of a two-phase distributed aggregation for 
which we
   /// are doing a streaming preaggregation.
-  bool is_streaming_preagg_;
+  const bool is_streaming_preagg_;
 
   /// True if any of the evaluators require the serialize step.
   bool needs_serialize_;
 
   /// The list of all aggregate operations for this exec node.
-  std::vector<AggFnEvaluator*> aggregate_evaluators_;
-
-  /// Cache of the ExprContexts of 'aggregate_evaluators_'. Used in the 
codegen'ed
-  /// version of UpdateTuple() to avoid loading aggregate_evaluators_[i] at 
runtime.
-  /// An entry is NULL if the aggregate evaluator is not codegen'ed or there 
is no Expr
-  /// in the aggregate evaluator (e.g. count(*)).
-  std::vector<ExprContext* const*> agg_expr_ctxs_;
-
-  /// FunctionContext for each aggregate function and backing MemPool. String 
data
-  /// returned by the aggregate functions is allocated via these contexts.
-  /// These contexts are only passed to the evaluators in the non-partitioned
-  /// (non-grouping) case. Otherwise they are only used to clone 
FunctionContexts for the
-  /// partitions.
+  std::vector<AggFn*> agg_fns_;
+
+  /// Evaluators for each aggregate function and backing MemPool. String data
+  /// returned by the aggregate functions is allocated via these evaluators.
+  /// These evaluatorss are only used for the non-grouping cases. For queries
+  /// with the group-by clause, each partition will clone these evaluators.
   /// TODO: we really need to plumb through CHAR(N) for intermediate types.
-  std::vector<impala_udf::FunctionContext*> agg_fn_ctxs_;
+  std::vector<AggFnEvaluator*> agg_fn_evals_;
   boost::scoped_ptr<MemPool> agg_fn_pool_;
 
   /// Exprs used to evaluate input rows
-  std::vector<ExprContext*> grouping_expr_ctxs_;
+  std::vector<ScalarExpr*> grouping_exprs_;
 
   /// Exprs used to insert constructed aggregation tuple into the hash table.
   /// All the exprs are simply SlotRefs for the intermediate tuple.
-  std::vector<ExprContext*> build_expr_ctxs_;
+  std::vector<ScalarExpr*> build_exprs_;
 
-  /// Indices of grouping exprs with var-len string types in 
grouping_expr_ctxs_. We need
-  /// to do more work for var-len expressions when allocating and spilling 
rows. All
-  /// var-len grouping exprs have type string.
+  /// Indices of grouping exprs with var-len string types in grouping_exprs_.
+  /// We need to do more work for var-len expressions when allocating and 
spilling rows.
+  /// All var-len grouping exprs have type string.
   std::vector<int> string_grouping_exprs_;
 
   RuntimeState* state_;
@@ -325,6 +323,8 @@ class PartitionedAggregationNode : public ExecNode {
   bool child_eos_;
 
   /// Used for hash-related functionality, such as evaluating rows and 
calculating hashes.
+  /// It also owns the evaluators for the grouping and build expressions used 
during hash
+  /// table insertion and probing.
   boost::scoped_ptr<HashTableCtx> ht_ctx_;
 
   /// Object pool that holds the Partition objects in hash_partitions_.
@@ -395,8 +395,8 @@ class PartitionedAggregationNode : public ExecNode {
     /// is spilled).
     boost::scoped_ptr<HashTable> hash_tbl;
 
-    /// Clone of parent's agg_fn_ctxs_ and backing MemPool.
-    std::vector<impala_udf::FunctionContext*> agg_fn_ctxs;
+    /// Clone of parent's agg_fn_evals_ and backing MemPool.
+    std::vector<AggFnEvaluator*> agg_fn_evals;
     boost::scoped_ptr<MemPool> agg_fn_pool;
 
     /// Tuple stream used to store aggregated rows. When the partition is not 
spilled,
@@ -432,54 +432,53 @@ class PartitionedAggregationNode : public ExecNode {
   /// Copies string data from the specified slot into 'pool', and sets the 
StringValues'
   /// ptrs to the copied data. Copies data from all tuples in 'row_batch' from
   /// 'first_row_idx' onwards. 'slot_desc' must have a var-len string type.
-  Status CopyStringData(const SlotDescriptor* slot_desc, RowBatch* row_batch,
+  Status CopyStringData(const SlotDescriptor& slot_desc, RowBatch* row_batch,
       int first_row_idx, MemPool* pool);
 
   /// Constructs singleton output tuple, allocating memory from pool.
   Tuple* ConstructSingletonOutputTuple(
-      const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs, MemPool* 
pool);
+      const std::vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool);
 
   /// Copies grouping values stored in 'ht_ctx_' that were computed over 
'current_row_'
-  /// using 'grouping_expr_ctxs_'. Aggregation expr slots are set to their 
initial values.
-  /// Returns NULL if there was not enough memory to allocate the tuple or an 
error
-  /// occurred. When returning NULL, sets *status.  Allocates tuple and 
var-len data for
+  /// using 'grouping_expr_evals_'. Aggregation expr slots are set to their 
initial
+  /// values. Returns NULL if there was not enough memory to allocate the 
tuple or errors
+  /// occurred. In which case, 'status' is set. Allocates tuple and var-len 
data for
   /// grouping exprs from stream. Var-len data for aggregate exprs is 
allocated from the
   /// FunctionContexts, so is stored outside the stream. If stream's small 
buffers get
   /// full, it will attempt to switch to IO-buffers.
-  Tuple* ConstructIntermediateTuple(
-      const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs,
+  Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& 
agg_fn_evals,
       BufferedTupleStream* stream, Status* status) noexcept;
 
   /// Constructs intermediate tuple, allocating memory from pool instead of 
the stream.
   /// Returns NULL and sets status if there is not enough memory to allocate 
the tuple.
-  Tuple* ConstructIntermediateTuple(
-      const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs, MemPool* 
pool,
-      Status* status) noexcept;
+  Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& 
agg_fn_evals,
+      MemPool* pool, Status* status) noexcept;
 
   /// Returns the number of bytes of variable-length data for the grouping 
values stored
   /// in 'ht_ctx_'.
   int GroupingExprsVarlenSize();
 
   /// Initializes intermediate tuple by copying grouping values stored in 
'ht_ctx_' that
-  /// that were computed over 'current_row_' using 'grouping_expr_ctxs_'. 
Writes the
+  /// that were computed over 'current_row_' using 'grouping_expr_evals_'. 
Writes the
   /// var-len data into buffer. 'buffer' points to the start of a buffer of at 
least the
   /// size of the variable-length data: 'varlen_size'.
   void CopyGroupingValues(Tuple* intermediate_tuple, uint8_t* buffer, int 
varlen_size);
 
   /// Initializes the aggregate function slots of an intermediate tuple.
   /// Any var-len data is allocated from the FunctionContexts.
-  void InitAggSlots(const vector<impala_udf::FunctionContext*>& agg_fn_ctxs,
+  void InitAggSlots(const std::vector<AggFnEvaluator*>& agg_fn_evals,
       Tuple* intermediate_tuple);
 
   /// Updates the given aggregation intermediate tuple with aggregation values 
computed
-  /// over 'row' using 'agg_fn_ctxs'. Whether the agg fn evaluator calls 
Update() or
+  /// over 'row' using 'agg_fn_evals'. Whether the agg fn evaluator calls 
Update() or
   /// Merge() is controlled by the evaluator itself, unless enforced 
explicitly by passing
   /// in is_merge == true.  The override is needed to merge spilled and 
non-spilled rows
   /// belonging to the same partition independent of whether the agg fn 
evaluators have
   /// is_merge() == true.
   /// This function is replaced by codegen (which is why we don't use a vector 
argument
-  /// for agg_fn_ctxs).. Any var-len data is allocated from the 
FunctionContexts.
-  void UpdateTuple(impala_udf::FunctionContext** agg_fn_ctxs, Tuple* tuple, 
TupleRow* row,
+  /// for agg_fn_evals).. Any var-len data is allocated from the 
FunctionContexts.
+  /// TODO: Fix the arguments order. Need to update CodegenUpdateTuple() too.
+  void UpdateTuple(AggFnEvaluator** agg_fn_evals, Tuple* tuple, TupleRow* row,
       bool is_merge = false) noexcept;
 
   /// Called on the intermediate tuple of each group after all input rows have 
been
@@ -491,7 +490,7 @@ class PartitionedAggregationNode : public ExecNode {
   /// the finalized/serialized aggregate values is returned.
   /// TODO: Coordinate the allocation of new tuples with the release of memory
   /// so as not to make memory consumption blow up.
-  Tuple* GetOutputTuple(const std::vector<impala_udf::FunctionContext*>& 
agg_fn_ctxs,
+  Tuple* GetOutputTuple(const std::vector<AggFnEvaluator*>& agg_fn_evals,
       Tuple* tuple, MemPool* pool);
 
   /// Do the aggregation for all tuple rows in the batch when there is no 
grouping.
@@ -527,11 +526,6 @@ class PartitionedAggregationNode : public ExecNode {
   template <bool AGGREGATED_ROWS>
   Status IR_ALWAYS_INLINE ProcessRow(TupleRow* row, HashTableCtx* ht_ctx);
 
-  /// Accessor for the expression contexts of an AggFnEvaluator. Returns an 
array of
-  /// pointers the the AggFnEvaluator's expression contexts. Used only in 
codegen'ed
-  /// version of UpdateTuple().
-  ExprContext* const* IR_ALWAYS_INLINE GetAggExprContexts(int i) const;
-
   /// Create a new intermediate tuple in partition, initialized with row. 
ht_ctx is
   /// the context for the partition's hash table and hash is the precomputed 
hash of
   /// the row. The row can be an unaggregated or aggregated row depending on
@@ -637,23 +631,23 @@ class PartitionedAggregationNode : public ExecNode {
   void ClosePartitions();
 
   /// Calls finalizes on all tuples starting at 'it'.
-  void CleanupHashTbl(const std::vector<impala_udf::FunctionContext*>& 
agg_fn_ctxs,
+  void CleanupHashTbl(const std::vector<AggFnEvaluator*>& agg_fn_evals,
       HashTable::Iterator it);
 
-  /// Codegen UpdateSlot(). Returns non-OK status if codegen is unsuccessful.
-  /// Assumes is_merge = false;
-  Status CodegenUpdateSlot(LlvmCodeGen* codegen, AggFnEvaluator* evaluator,
-      int evaluator_idx, SlotDescriptor* slot_desc, llvm::Function** fn);
+  /// Codegen for updating aggregate expressions agg_fns_[agg_fn_idx]
+  /// and returns the IR function in 'fn'. Returns non-OK status if codegen
+  /// is unsuccessful.
+  Status CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
+      SlotDescriptor* slot_desc, llvm::Function** fn);
 
   /// Codegen a call to a function implementing the UDA interface with input 
values
   /// from 'input_vals'. 'dst_val' should contain the previous value of the 
aggregate
   /// function, and 'updated_dst_val' is set to the new value after the Update 
or Merge
   /// operation is applied. The instruction sequence for the UDA call is 
inserted at
   /// the insert position of 'builder'.
-  Status CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder,
-      AggFnEvaluator* evaluator, llvm::Value* agg_fn_ctx_arg,
-      const std::vector<CodegenAnyVal>& input_vals, const CodegenAnyVal& 
dst_val,
-      CodegenAnyVal* updated_dst_val);
+  Status CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder, AggFn* 
agg_fn,
+      llvm::Value* agg_fn_ctx_arg, const std::vector<CodegenAnyVal>& 
input_vals,
+      const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val);
 
   /// Codegen UpdateTuple(). Returns non-OK status if codegen is unsuccessful.
   Status CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/partitioned-hash-join-builder-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder-ir.cc 
b/be/src/exec/partitioned-hash-join-builder-ir.cc
index be980c7..e5f649e 100644
--- a/be/src/exec/partitioned-hash-join-builder-ir.cc
+++ b/be/src/exec/partitioned-hash-join-builder-ir.cc
@@ -59,7 +59,7 @@ Status PhjBuilder::ProcessBuildBatch(
       DCHECK_EQ(ctx->level(), 0)
           << "Runtime filters should not be built during repartitioning.";
       // TODO: unroll loop and codegen expr evaluation and hashing 
(IMPALA-3360).
-      for (const FilterContext& ctx : filters_) ctx.Insert(build_row);
+      for (const FilterContext& ctx : filter_ctxs_) ctx.Insert(build_row);
     }
     const uint32_t hash = expr_vals_cache->CurExprValuesHash();
     const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc 
b/be/src/exec/partitioned-hash-join-builder.cc
index 57a690c..a17e295 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -23,8 +23,8 @@
 
 #include "codegen/llvm-codegen.h"
 #include "exec/hash-table.inline.h"
-#include "exprs/expr-context.h"
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "runtime/buffered-tuple-stream.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
@@ -75,31 +75,41 @@ PhjBuilder::PhjBuilder(int join_node_id, TJoinOp::type 
join_op,
     insert_batch_fn_(NULL),
     insert_batch_fn_level0_(NULL) {}
 
-Status PhjBuilder::Init(RuntimeState* state,
+Status PhjBuilder::Init(const vector<TExpr>& thrift_output_exprs,
+    const TDataSink& tsink, RuntimeState* state) {
+  return Status::OK();
+}
+
+Status PhjBuilder::InitExprsAndFilters(RuntimeState* state,
     const vector<TEqJoinCondition>& eq_join_conjuncts,
-    const vector<TRuntimeFilterDesc>& filters) {
+    const vector<TRuntimeFilterDesc>& filter_descs) {
   for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) {
-    ExprContext* ctx;
-    RETURN_IF_ERROR(Expr::CreateExprTree(&pool_, eq_join_conjunct.right, 
&ctx));
-    build_expr_ctxs_.push_back(ctx);
+    ScalarExpr* build_expr;
+    RETURN_IF_ERROR(
+        ScalarExpr::Create(eq_join_conjunct.right, row_desc_, state, 
&build_expr));
+    build_exprs_.push_back(build_expr);
     is_not_distinct_from_.push_back(eq_join_conjunct.is_not_distinct_from);
   }
 
-  for (const TRuntimeFilterDesc& filter : filters) {
-    // If filter propagation not enabled, only consider building broadcast 
joins (that may
-    // be consumed by this fragment).
-    if (state->query_options().runtime_filter_mode != 
TRuntimeFilterMode::GLOBAL
-        && !filter.is_broadcast_join) {
+  for (const TRuntimeFilterDesc& filter_desc : filter_descs) {
+    // If filter propagation not enabled, only consider building broadcast 
joins (that
+    // may be consumed by this fragment).
+    if (state->query_options().runtime_filter_mode != 
TRuntimeFilterMode::GLOBAL &&
+        !filter_desc.is_broadcast_join) {
       continue;
     }
-    if (state->query_options().disable_row_runtime_filtering
-        && !filter.applied_on_partition_columns) {
+    if (state->query_options().disable_row_runtime_filtering &&
+        !filter_desc.applied_on_partition_columns) {
       continue;
     }
-    FilterContext filter_ctx;
-    filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, true);
-    RETURN_IF_ERROR(Expr::CreateExprTree(&pool_, filter.src_expr, 
&filter_ctx.expr_ctx));
-    filters_.push_back(filter_ctx);
+    ScalarExpr* filter_expr;
+    RETURN_IF_ERROR(
+        ScalarExpr::Create(filter_desc.src_expr, row_desc_, state, 
&filter_expr));
+    filter_exprs_.push_back(filter_expr);
+
+    // TODO: Move to Prepare().
+    filter_ctxs_.emplace_back();
+    filter_ctxs_.back().filter = 
state->filter_bank()->RegisterFilter(filter_desc, true);
   }
   return Status::OK();
 }
@@ -108,21 +118,25 @@ string PhjBuilder::GetName() {
   return Substitute("Hash Join Builder (join_node_id=$0)", join_node_id_);
 }
 
+void PhjBuilder::FreeLocalAllocations() const {
+  if (ht_ctx_.get() != nullptr) ht_ctx_->FreeLocalAllocations();
+  for (const FilterContext& ctx : filter_ctxs_) {
+    if (ctx.expr_eval != nullptr) ctx.expr_eval->FreeLocalAllocations();
+  }
+}
+
 Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker) {
   RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
-  RETURN_IF_ERROR(
-      Expr::Prepare(build_expr_ctxs_, state, row_desc_, 
expr_mem_tracker_.get()));
-  expr_ctxs_to_free_.insert(
-      expr_ctxs_to_free_.end(), build_expr_ctxs_.begin(), 
build_expr_ctxs_.end());
-
-  for (const FilterContext& ctx : filters_) {
-    RETURN_IF_ERROR(ctx.expr_ctx->Prepare(state, row_desc_, 
expr_mem_tracker_.get()));
-    expr_ctxs_to_free_.push_back(ctx.expr_ctx);
-  }
-  RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, 
build_expr_ctxs_,
+  RETURN_IF_ERROR(HashTableCtx::Create(&pool_, state, build_exprs_, 
build_exprs_,
       HashTableStoresNulls(), is_not_distinct_from_, 
state->fragment_hash_seed(),
-      MAX_PARTITION_DEPTH, row_desc_.tuple_descriptors().size(), 
mem_tracker_.get(),
+      MAX_PARTITION_DEPTH, row_desc_.tuple_descriptors().size(), 
expr_mem_pool(),
       &ht_ctx_));
+
+  DCHECK_EQ(filter_exprs_.size(), filter_ctxs_.size());
+  for (int i = 0; i < filter_exprs_.size(); ++i) {
+    RETURN_IF_ERROR(ScalarExprEvaluator::Create(*filter_exprs_[i], state, 
&pool_,
+        expr_mem_pool(), &filter_ctxs_[i].expr_eval));
+  }
   RETURN_IF_ERROR(state->block_mgr()->RegisterClient(
       Substitute("PartitionedHashJoin id=$0 builder=$1", join_node_id_, this),
       MinRequiredBuffers(), true, mem_tracker_.get(), state, 
&block_mgr_client_));
@@ -150,9 +164,10 @@ Status PhjBuilder::Prepare(RuntimeState* state, 
MemTracker* parent_mem_tracker)
 }
 
 Status PhjBuilder::Open(RuntimeState* state) {
-  RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state));
-  for (const FilterContext& filter : filters_) {
-    RETURN_IF_ERROR(filter.expr_ctx->Open(state));
+  RETURN_IF_ERROR(ht_ctx_->Open(state));
+
+  for (const FilterContext& ctx : filter_ctxs_) {
+    RETURN_IF_ERROR(ctx.expr_eval->Open(state));
   }
   RETURN_IF_ERROR(CreateHashPartitions(0));
   AllocateRuntimeFilters();
@@ -165,7 +180,7 @@ Status PhjBuilder::Open(RuntimeState* state) {
 
 Status PhjBuilder::Send(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(partition_build_rows_timer_);
-  bool build_filters = ht_ctx_->level() == 0 && filters_.size() > 0;
+  bool build_filters = ht_ctx_->level() == 0 && filter_ctxs_.size() > 0;
   if (process_build_batch_fn_ == NULL) {
     RETURN_IF_ERROR(ProcessBuildBatch(batch, ht_ctx_.get(), build_filters));
   } else {
@@ -179,7 +194,7 @@ Status PhjBuilder::Send(RuntimeState* state, RowBatch* 
batch) {
   }
 
   // Free any local allocations made during partitioning.
-  ExprContext::FreeLocalAllocations(expr_ctxs_to_free_);
+  FreeLocalAllocations();
   COUNTER_ADD(num_build_rows_partitioned_, batch->num_rows());
   return Status::OK();
 }
@@ -225,19 +240,23 @@ Status PhjBuilder::FlushFinal(RuntimeState* state) {
 
 void PhjBuilder::Close(RuntimeState* state) {
   if (closed_) return;
-  ExprContext::FreeLocalAllocations(expr_ctxs_to_free_);
+  FreeLocalAllocations();
   CloseAndDeletePartitions();
-  if (ht_ctx_ != NULL) ht_ctx_->Close();
-  Expr::Close(build_expr_ctxs_, state);
-  for (const FilterContext& ctx : filters_) ctx.expr_ctx->Close(state);
+  if (ht_ctx_ != nullptr) ht_ctx_->Close(state);
+  ht_ctx_.reset();
+  for (const FilterContext& ctx : filter_ctxs_) {
+    if (ctx.expr_eval != nullptr) ctx.expr_eval->Close(state);
+  }
+  ScalarExpr::Close(filter_exprs_);
   if (block_mgr_client_ != NULL) 
state->block_mgr()->ClearReservations(block_mgr_client_);
+  ScalarExpr::Close(build_exprs_);
   pool_.Clear();
   DataSink::Close(state);
   closed_ = true;
 }
 
 void PhjBuilder::Reset() {
-  ExprContext::FreeLocalAllocations(expr_ctxs_to_free_);
+  FreeLocalAllocations();
   non_empty_build_ = false;
   CloseAndDeletePartitions();
 }
@@ -448,13 +467,13 @@ void PhjBuilder::CloseAndDeletePartitions() {
 }
 
 void PhjBuilder::AllocateRuntimeFilters() {
-  DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || filters_.size() == 
0)
+  DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || filter_ctxs_.size() 
== 0)
       << "Runtime filters not supported with NULL_AWARE_LEFT_ANTI_JOIN";
   DCHECK(ht_ctx_ != NULL);
-  for (int i = 0; i < filters_.size(); ++i) {
-    filters_[i].local_bloom_filter =
+  for (int i = 0; i < filter_ctxs_.size(); ++i) {
+    filter_ctxs_[i].local_bloom_filter =
         runtime_state_->filter_bank()->AllocateScratchBloomFilter(
-            filters_[i].filter->id());
+            filter_ctxs_[i].filter->id());
   }
 }
 
@@ -467,7 +486,7 @@ void PhjBuilder::PublishRuntimeFilters(int64_t 
num_build_rows) {
   // poor estimate of the NDV - particularly if the filter expression is a 
function of
   // several columns.
   // TODO: Better heuristic.
-  for (const FilterContext& ctx : filters_) {
+  for (const FilterContext& ctx : filter_ctxs_) {
     // TODO: Consider checking actual number of bits set in filter to compute 
FP rate.
     // TODO: Consider checking this every few batches or so.
     bool fp_rate_too_high = runtime_state_->filter_bank()->FpRateTooHigh(
@@ -478,15 +497,15 @@ void PhjBuilder::PublishRuntimeFilters(int64_t 
num_build_rows) {
     num_enabled_filters += !fp_rate_too_high;
   }
 
-  if (filters_.size() > 0) {
+  if (filter_ctxs_.size() > 0) {
     string info_string;
-    if (num_enabled_filters == filters_.size()) {
-      info_string = Substitute("$0 of $0 Runtime Filter$1 Published", 
filters_.size(),
-          filters_.size() == 1 ? "" : "s");
+    if (num_enabled_filters == filter_ctxs_.size()) {
+      info_string = Substitute("$0 of $0 Runtime Filter$1 Published", 
filter_ctxs_.size(),
+          filter_ctxs_.size() == 1 ? "" : "s");
     } else {
       info_string = Substitute("$0 of $1 Runtime Filter$2 Published, $3 
Disabled",
-          num_enabled_filters, filters_.size(), filters_.size() == 1 ? "" : 
"s",
-          filters_.size() - num_enabled_filters);
+          num_enabled_filters, filter_ctxs_.size(), filter_ctxs_.size() == 1 ? 
"" : "s",
+          filter_ctxs_.size() - num_enabled_filters);
     }
     profile()->AddInfoString("Runtime filters", info_string);
   }
@@ -689,7 +708,7 @@ Status PhjBuilder::Partition::BuildHashTable(bool* built) {
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(state->GetQueryStatus());
     // Free any local allocations made while inserting.
-    ExprContext::FreeLocalAllocations(parent_->expr_ctxs_to_free_);
+    parent_->FreeLocalAllocations();
     batch.Reset();
   } while (!eos);
 
@@ -791,9 +810,9 @@ Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* 
codegen,
 
   // Always build runtime filters at level0 (if there are any).
   // Note that the first argument of this function is the return value.
-  Value* build_filters_l0_arg = 
codegen->GetArgument(process_build_batch_fn_level0, 4);
-  build_filters_l0_arg->replaceAllUsesWith(
-      ConstantInt::get(Type::getInt1Ty(codegen->context()), filters_.size() > 
0));
+  Value* build_filter_l0_arg = 
codegen->GetArgument(process_build_batch_fn_level0, 4);
+  build_filter_l0_arg->replaceAllUsesWith(
+      ConstantInt::get(Type::getInt1Ty(codegen->context()), 
filter_ctxs_.size() > 0));
 
   // process_build_batch_fn_level0 uses CRC hash if available,
   replaced =
@@ -808,8 +827,8 @@ Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* 
codegen,
   // Never build filters after repartitioning, as all rows have already been 
added to the
   // filters during the level0 build. Note that the first argument of this 
function is the
   // return value.
-  Value* build_filters_arg = codegen->GetArgument(process_build_batch_fn, 4);
-  build_filters_arg->replaceAllUsesWith(
+  Value* build_filter_arg = codegen->GetArgument(process_build_batch_fn, 4);
+  build_filter_arg->replaceAllUsesWith(
       ConstantInt::get(Type::getInt1Ty(codegen->context()), false));
 
   // Finalize ProcessBuildBatch functions

Reply via email to