http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/unnest-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/unnest-node.cc b/be/src/exec/unnest-node.cc
index bf467bd..5bde655 100644
--- a/be/src/exec/unnest-node.cc
+++ b/be/src/exec/unnest-node.cc
@@ -18,7 +18,7 @@
 #include "common/status.h"
 #include "exec/unnest-node.h"
 #include "exec/subplan-node.h"
-#include "exprs/expr-context.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "exprs/slot-ref.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
@@ -33,31 +33,40 @@ UnnestNode::UnnestNode(ObjectPool* pool, const TPlanNode& 
tnode,
     const DescriptorTbl& descs)
   : ExecNode(pool, tnode, descs),
     item_byte_size_(0),
-    coll_expr_ctx_(NULL),
-    coll_slot_desc_(NULL),
+    thrift_coll_expr_(tnode.unnest_node.collection_expr),
+    coll_expr_(nullptr),
+    coll_expr_eval_(nullptr),
+    coll_slot_desc_(nullptr),
     coll_tuple_idx_(-1),
-    coll_value_(NULL),
+    coll_value_(nullptr),
     item_idx_(0),
     num_collections_(0),
     total_collection_size_(0),
     max_collection_size_(-1),
     min_collection_size_(-1),
-    avg_collection_size_counter_(NULL),
-    max_collection_size_counter_(NULL),
-    min_collection_size_counter_(NULL),
-    num_collections_counter_(NULL) {
+    avg_collection_size_counter_(nullptr),
+    max_collection_size_counter_(nullptr),
+    min_collection_size_counter_(nullptr),
+    num_collections_counter_(nullptr) {
 }
 
 Status UnnestNode::Init(const TPlanNode& tnode, RuntimeState* state) {
-  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
   DCHECK(tnode.__isset.unnest_node);
-  Expr::CreateExprTree(pool_, tnode.unnest_node.collection_expr, 
&coll_expr_ctx_);
+  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
+  return Status::OK();
+}
+
+Status UnnestNode::InitCollExpr(RuntimeState* state) {
+  DCHECK(containing_subplan_ != nullptr)
+      << "set_containing_subplan() must have been called";
+  const RowDescriptor& row_desc = containing_subplan_->child(0)->row_desc();
+  RETURN_IF_ERROR(ScalarExpr::Create(thrift_coll_expr_, row_desc, state, 
&coll_expr_));
+  DCHECK(coll_expr_->IsSlotRef());
   return Status::OK();
 }
 
 Status UnnestNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
-  DCHECK(containing_subplan_ != NULL) << "set_containing_subplan() must be 
called";
   RETURN_IF_ERROR(ExecNode::Prepare(state));
 
   avg_collection_size_counter_ =
@@ -71,17 +80,18 @@ Status UnnestNode::Prepare(RuntimeState* state) {
 
   DCHECK_EQ(1, row_desc().tuple_descriptors().size());
   const TupleDescriptor* item_tuple_desc = row_desc().tuple_descriptors()[0];
-  DCHECK(item_tuple_desc != NULL);
+  DCHECK(item_tuple_desc != nullptr);
   item_byte_size_ = item_tuple_desc->byte_size();
-  RETURN_IF_ERROR(coll_expr_ctx_->Prepare(
-      state, containing_subplan_->child(0)->row_desc(), expr_mem_tracker()));
+
+  RETURN_IF_ERROR(ScalarExprEvaluator::Create(*coll_expr_, state, pool_,
+      expr_mem_pool(), &coll_expr_eval_));
 
   // Set the coll_slot_desc_ and the corresponding tuple index used for 
manually
   // evaluating the collection SlotRef and for projection.
-  DCHECK(coll_expr_ctx_->root()->is_slotref());
-  const SlotRef* slot_ref = static_cast<SlotRef*>(coll_expr_ctx_->root());
+  DCHECK(coll_expr_->IsSlotRef());
+  const SlotRef* slot_ref = static_cast<SlotRef*>(coll_expr_);
   coll_slot_desc_ = state->desc_tbl().GetSlotDescriptor(slot_ref->slot_id());
-  DCHECK(coll_slot_desc_ != NULL);
+  DCHECK(coll_slot_desc_ != nullptr);
   const RowDescriptor& row_desc = containing_subplan_->child(0)->row_desc();
   coll_tuple_idx_ = row_desc.GetTupleIdx(coll_slot_desc_->parent()->id());
 
@@ -91,17 +101,17 @@ Status UnnestNode::Prepare(RuntimeState* state) {
 Status UnnestNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Open(state));
-  RETURN_IF_ERROR(coll_expr_ctx_->Open(state));
+  RETURN_IF_ERROR(coll_expr_eval_->Open(state));
 
-  DCHECK(containing_subplan_->current_row() != NULL);
+  DCHECK(containing_subplan_->current_row() != nullptr);
   Tuple* tuple = 
containing_subplan_->current_input_row_->GetTuple(coll_tuple_idx_);
-  if (tuple != NULL) {
+  if (tuple != nullptr) {
     // Retrieve the collection value to be unnested directly from the tuple. 
We purposely
     // ignore the null bit of the slot because we may have set it in a 
previous Open() of
     // this same unnest node for projection.
     coll_value_ = reinterpret_cast<const CollectionValue*>(
         tuple->GetSlot(coll_slot_desc_->tuple_offset()));
-    // Projection: Set the slot containing the collection value to NULL.
+    // Projection: Set the slot containing the collection value to nullptr.
     tuple->SetNull(coll_slot_desc_->null_indicator_offset());
   } else {
     coll_value_ = &EMPTY_COLLECTION_VALUE;
@@ -134,7 +144,7 @@ Status UnnestNode::GetNext(RuntimeState* state, RowBatch* 
row_batch, bool* eos)
   *eos = false;
 
   // Populate the output row_batch with tuples from the collection.
-  DCHECK(coll_value_ != NULL);
+  DCHECK(coll_value_ != nullptr);
   DCHECK_GE(coll_value_->num_tuples, 0);
   while (item_idx_ < coll_value_->num_tuples) {
     Tuple* item =
@@ -144,7 +154,8 @@ Status UnnestNode::GetNext(RuntimeState* state, RowBatch* 
row_batch, bool* eos)
     TupleRow* row = row_batch->GetRow(row_idx);
     row->SetTuple(0, item);
     // TODO: Ideally these should be evaluated by the parent scan node.
-    if (EvalConjuncts(&conjunct_ctxs_[0], conjunct_ctxs_.size(), row)) {
+    DCHECK_EQ(conjuncts_.size(), conjunct_evals_.size());
+    if (EvalConjuncts(conjunct_evals_.data(), conjuncts_.size(), row)) {
       row_batch->CommitLastRow();
       // The limit is handled outside of this loop.
       if (row_batch->AtCapacity()) break;
@@ -171,8 +182,8 @@ Status UnnestNode::Reset(RuntimeState* state) {
 
 void UnnestNode::Close(RuntimeState* state) {
   if (is_closed()) return;
-  DCHECK(coll_expr_ctx_ != NULL);
-  coll_expr_ctx_->Close(state);
+  if (coll_expr_eval_ != nullptr) coll_expr_eval_->Close(state);
+  if (coll_expr_ != nullptr) coll_expr_->Close();
   ExecNode::Close(state);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/unnest-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/unnest-node.h b/be/src/exec/unnest-node.h
index 1aeca74..247dd76 100644
--- a/be/src/exec/unnest-node.h
+++ b/be/src/exec/unnest-node.h
@@ -19,7 +19,8 @@
 #define IMPALA_EXEC_UNNEST_NODE_H_
 
 #include "exec/exec-node.h"
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
+#include "runtime/collection-value.h"
 
 namespace impala {
 
@@ -61,6 +62,10 @@ class UnnestNode : public ExecNode {
   virtual Status Reset(RuntimeState* state);
   virtual void Close(RuntimeState* state);
 
+  /// Initializes the expression which produces the collection to be unnested.
+  /// Called by the containing subplan node.
+  Status InitCollExpr(RuntimeState* state);
+
  private:
   friend class SubplanNode;
 
@@ -72,9 +77,11 @@ class UnnestNode : public ExecNode {
   /// Expr that produces the collection to be unnested. Currently always a 
SlotRef into an
   /// collection-typed slot. We do not evaluate this expr for setting 
coll_value_, but
   /// instead manually retrieve the slot value to support projection (see 
class comment).
-  ExprContext* coll_expr_ctx_;
+  const TExpr& thrift_coll_expr_;
+  ScalarExpr* coll_expr_;
+  ScalarExprEvaluator* coll_expr_eval_;
 
-  /// Descriptor of the collection-typed slot referenced by coll_expr_ctx_. 
Set in
+  /// Descriptor of the collection-typed slot referenced by coll_expr_eval_. 
Set in
   /// Prepare().  This slot is always set to NULL in Open() as a simple 
projection.
   const SlotDescriptor* coll_slot_desc_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt
index 660b951..1fe3bcb 100644
--- a/be/src/exprs/CMakeLists.txt
+++ b/be/src/exprs/CMakeLists.txt
@@ -23,7 +23,9 @@ set(LIBRARY_OUTPUT_PATH 
"${BUILD_OUTPUT_ROOT_DIRECTORY}/exprs")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exprs")
 
 add_library(Exprs
+  agg-fn.cc
   agg-fn-evaluator.cc
+  agg-fn-evaluator-ir.cc
   aggregate-functions-ir.cc
   anyval-util.cc
   bit-byte-functions-ir.cc
@@ -36,8 +38,6 @@ add_library(Exprs
   decimal-functions-ir.cc
   decimal-operators-ir.cc
   expr.cc
-  expr-context.cc
-  expr-ir.cc
   hive-udf-call.cc
   in-predicate-ir.cc
   is-not-empty-predicate.cc
@@ -49,6 +49,9 @@ add_library(Exprs
   math-functions-ir.cc
   null-literal.cc
   operators-ir.cc
+  scalar-expr.cc
+  scalar-expr-evaluator.cc
+  scalar-expr-ir.cc
   slot-ref.cc
   string-functions-ir.cc
   timestamp-functions.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/agg-fn-evaluator-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/agg-fn-evaluator-ir.cc 
b/be/src/exprs/agg-fn-evaluator-ir.cc
new file mode 100644
index 0000000..407bdcb
--- /dev/null
+++ b/be/src/exprs/agg-fn-evaluator-ir.cc
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/agg-fn-evaluator.h"
+
+using namespace impala;
+
+FunctionContext* AggFnEvaluator::agg_fn_ctx() const {
+  return agg_fn_ctx_.get();
+}
+
+ScalarExprEvaluator* const* AggFnEvaluator::input_evals() const {
+  return input_evals_.data();
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/agg-fn-evaluator.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/agg-fn-evaluator.cc b/be/src/exprs/agg-fn-evaluator.cc
index 81c0277..910eef1 100644
--- a/be/src/exprs/agg-fn-evaluator.cc
+++ b/be/src/exprs/agg-fn-evaluator.cc
@@ -24,7 +24,8 @@
 #include "exec/aggregation-node.h"
 #include "exprs/aggregate-functions.h"
 #include "exprs/anyval-util.h"
-#include "exprs/expr-context.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "exprs/scalar-fn-call.h"
 #include "runtime/lib-cache.h"
 #include "runtime/raw-value.h"
@@ -68,186 +69,134 @@ typedef StringVal (*SerializeFn)(FunctionContext*, const 
StringVal&);
 typedef AnyVal (*GetValueFn)(FunctionContext*, const AnyVal&);
 typedef AnyVal (*FinalizeFn)(FunctionContext*, const AnyVal&);
 
-Status AggFnEvaluator::Create(ObjectPool* pool, const TExpr& desc,
-    AggFnEvaluator** result) {
-  return Create(pool, desc, false, result);
+const char* AggFnEvaluator::LLVM_CLASS_NAME = "class.impala::AggFnEvaluator";
+
+AggFnEvaluator::AggFnEvaluator(const AggFn& agg_fn, MemPool* mem_pool, bool 
is_clone)
+  : is_clone_(is_clone),
+    agg_fn_(agg_fn),
+    mem_pool_(mem_pool) {
 }
 
-Status AggFnEvaluator::Create(ObjectPool* pool, const TExpr& desc,
-    bool is_analytic_fn, AggFnEvaluator** result) {
-  DCHECK_GT(desc.nodes.size(), 0);
-  *result = pool->Add(new AggFnEvaluator(desc.nodes[0], is_analytic_fn));
-  int node_idx = 0;
-  for (int i = 0; i < desc.nodes[0].num_children; ++i) {
-    ++node_idx;
-    Expr* expr = NULL;
-    ExprContext* ctx = NULL;
-    RETURN_IF_ERROR(Expr::CreateTreeFromThrift(
-        pool, desc.nodes, NULL, &node_idx, &expr, &ctx));
-    (*result)->input_expr_ctxs_.push_back(ctx);
-  }
-  return Status::OK();
+AggFnEvaluator::~AggFnEvaluator() {
+  DCHECK(closed_);
 }
 
-AggFnEvaluator::AggFnEvaluator(const TExprNode& desc, bool is_analytic_fn)
-  : fn_(desc.fn),
-    is_merge_(desc.agg_expr.is_merge_agg),
-    is_analytic_fn_(is_analytic_fn),
-    intermediate_slot_desc_(NULL),
-    output_slot_desc_(NULL),
-    arg_type_descs_(AnyValUtil::ColumnTypesToTypeDescs(
-        ColumnType::FromThrift(desc.agg_expr.arg_types))),
-    cache_entry_(NULL),
-    init_fn_(NULL),
-    update_fn_(NULL),
-    remove_fn_(NULL),
-    merge_fn_(NULL),
-    serialize_fn_(NULL),
-    get_value_fn_(NULL),
-    finalize_fn_(NULL) {
-  DCHECK(desc.fn.__isset.aggregate_fn);
-  DCHECK(desc.node_type == TExprNodeType::AGGREGATE_EXPR);
-  // TODO: remove. See comment with AggregationOp
-  if (fn_.name.function_name == "count") {
-    agg_op_ = COUNT;
-  } else if (fn_.name.function_name == "min") {
-    agg_op_ = MIN;
-  } else if (fn_.name.function_name == "max") {
-    agg_op_ = MAX;
-  } else if (fn_.name.function_name == "sum") {
-    agg_op_ = SUM;
-  } else if (fn_.name.function_name == "avg") {
-    agg_op_ = AVG;
-  } else if (fn_.name.function_name == "ndv" ||
-      fn_.name.function_name == "ndv_no_finalize") {
-    agg_op_ = NDV;
-  } else {
-    agg_op_ = OTHER;
-  }
+const SlotDescriptor& AggFnEvaluator::intermediate_slot_desc() const {
+  return agg_fn_.intermediate_slot_desc();
 }
 
-AggFnEvaluator::~AggFnEvaluator() {
-  DCHECK(cache_entry_ == NULL) << "Need to call Close()";
+const ColumnType& AggFnEvaluator::intermediate_type() const {
+  return agg_fn_.intermediate_type();
 }
 
-Status AggFnEvaluator::Prepare(RuntimeState* state, const RowDescriptor& desc,
-      const SlotDescriptor* intermediate_slot_desc,
-      const SlotDescriptor* output_slot_desc,
-      MemPool* agg_fn_pool, FunctionContext** agg_fn_ctx) {
-  DCHECK(intermediate_slot_desc != NULL);
-  DCHECK_EQ(intermediate_slot_desc->type().type,
-      ColumnType::FromThrift(fn_.aggregate_fn.intermediate_type).type);
-  DCHECK(intermediate_slot_desc_ == NULL);
-  intermediate_slot_desc_ = intermediate_slot_desc;
-
-  DCHECK(output_slot_desc != NULL);
-  DCHECK_EQ(output_slot_desc->type().type, 
ColumnType::FromThrift(fn_.ret_type).type);
-  DCHECK(output_slot_desc_ == NULL);
-  output_slot_desc_ = output_slot_desc;
-
-  RETURN_IF_ERROR(
-      Expr::Prepare(input_expr_ctxs_, state, desc, 
agg_fn_pool->mem_tracker()));
-
-  for (int i = 0; i < input_expr_ctxs_.size(); ++i) {
+Status AggFnEvaluator::Create(const AggFn& agg_fn, RuntimeState* state, 
ObjectPool* pool,
+    MemPool* mem_pool, AggFnEvaluator** result) {
+  *result = nullptr;
+
+  // Create a new AggFn evaluator.
+  AggFnEvaluator* agg_fn_eval = pool->Add(new AggFnEvaluator(agg_fn, mem_pool, 
false));
+  agg_fn_eval->agg_fn_ctx_.reset(FunctionContextImpl::CreateContext(state, 
mem_pool,
+      agg_fn.GetIntermediateTypeDesc(), agg_fn.GetOutputTypeDesc(),
+      agg_fn.arg_type_descs()));
+
+  Status status;
+  // Create the evaluators for the input expressions.
+  for (const ScalarExpr* input_expr : agg_fn.children()) {
+    ScalarExprEvaluator* input_eval;
+    status = ScalarExprEvaluator::Create(*input_expr, state, pool, mem_pool, 
&input_eval);
+    if (UNLIKELY(!status.ok())) goto cleanup;
+    agg_fn_eval->input_evals_.push_back(input_eval);
+    DCHECK(&input_eval->root() == input_expr);
+
     AnyVal* staging_input_val;
-    RETURN_IF_ERROR(
-        AllocateAnyVal(state, agg_fn_pool, input_expr_ctxs_[i]->root()->type(),
-            "Could not allocate aggregate expression input value", 
&staging_input_val));
-    staging_input_vals_.push_back(staging_input_val);
+    status = AllocateAnyVal(state, mem_pool, input_expr->type(),
+        "Could not allocate aggregate expression input value", 
&staging_input_val);
+    agg_fn_eval->staging_input_vals_.push_back(staging_input_val);
+    if (UNLIKELY(!status.ok())) goto cleanup;
   }
-  RETURN_IF_ERROR(AllocateAnyVal(state, agg_fn_pool, intermediate_type(),
+  DCHECK_EQ(agg_fn.GetNumChildren(), agg_fn_eval->input_evals_.size());
+  DCHECK_EQ(agg_fn_eval->staging_input_vals_.size(), 
agg_fn_eval->input_evals_.size());
+
+  status = AllocateAnyVal(state, mem_pool, agg_fn.intermediate_type(),
       "Could not allocate aggregate expression intermediate value",
-      &staging_intermediate_val_));
-  RETURN_IF_ERROR(AllocateAnyVal(state, agg_fn_pool, intermediate_type(),
+      &(agg_fn_eval->staging_intermediate_val_));
+  if (UNLIKELY(!status.ok())) goto cleanup;
+  status = AllocateAnyVal(state, mem_pool, agg_fn.intermediate_type(),
       "Could not allocate aggregate expression merge input value",
-      &staging_merge_input_val_));
+      &(agg_fn_eval->staging_merge_input_val_));
+  if (UNLIKELY(!status.ok())) goto cleanup;
 
-  if (is_merge_) {
-    DCHECK_EQ(staging_input_vals_.size(), 1) << "Merge should only have 1 
input.";
+  if (agg_fn.is_merge()) {
+    DCHECK_EQ(agg_fn_eval->staging_input_vals_.size(), 1)
+        << "Merge should only have 1 input.";
   }
+  *result = agg_fn_eval;
+  return Status::OK();
 
-  // Load the function pointers. Merge is not required if this is evaluating an
-  // analytic function.
-  if (fn_.aggregate_fn.init_fn_symbol.empty() ||
-      fn_.aggregate_fn.update_fn_symbol.empty() ||
-      (!is_analytic_fn_ && fn_.aggregate_fn.merge_fn_symbol.empty())) {
-    // This path is only for partially implemented builtins.
-    DCHECK_EQ(fn_.binary_type, TFunctionBinaryType::BUILTIN);
-    stringstream ss;
-    ss << "Function " << fn_.name.function_name << " is not implemented.";
-    return Status(ss.str());
-  }
-
-  RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(
-      fn_.hdfs_location, fn_.aggregate_fn.init_fn_symbol, &init_fn_, 
&cache_entry_));
-  RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(
-      fn_.hdfs_location, fn_.aggregate_fn.update_fn_symbol, &update_fn_, 
&cache_entry_));
-
-  // Merge() is not loaded if evaluating the agg fn as an analytic function.
-  if (!is_analytic_fn_) {
-    RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
-          fn_.aggregate_fn.merge_fn_symbol, &merge_fn_, &cache_entry_));
-  }
+cleanup:
+  DCHECK(!status.ok());
+  agg_fn_eval->Close(state);
+  return status;
+}
 
-  // Serialize(), GetValue(), Remove() and Finalize() are optional
-  if (!fn_.aggregate_fn.serialize_fn_symbol.empty()) {
-    RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(
-        fn_.hdfs_location, fn_.aggregate_fn.serialize_fn_symbol, 
&serialize_fn_,
-        &cache_entry_));
-  }
-  if (!fn_.aggregate_fn.get_value_fn_symbol.empty()) {
-    RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(
-        fn_.hdfs_location, fn_.aggregate_fn.get_value_fn_symbol, 
&get_value_fn_,
-        &cache_entry_));
-  }
-  if (!fn_.aggregate_fn.remove_fn_symbol.empty()) {
-    RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
-        fn_.aggregate_fn.remove_fn_symbol, &remove_fn_, &cache_entry_));
+Status AggFnEvaluator::Create(const vector<AggFn*>& agg_fns, RuntimeState* 
state,
+    ObjectPool* pool, MemPool* mem_pool, vector<AggFnEvaluator*>* evals) {
+  for (const AggFn* agg_fn : agg_fns) {
+    AggFnEvaluator* agg_fn_eval;
+    RETURN_IF_ERROR(AggFnEvaluator::Create(*agg_fn, state, pool, mem_pool, 
&agg_fn_eval));
+    evals->push_back(agg_fn_eval);
   }
-  if (!fn_.aggregate_fn.finalize_fn_symbol.empty()) {
-    RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
-        fn_.aggregate_fn.finalize_fn_symbol, &finalize_fn_, &cache_entry_));
-  }
-  *agg_fn_ctx = FunctionContextImpl::CreateContext(state, agg_fn_pool,
-      GetIntermediateTypeDesc(), GetOutputTypeDesc(), arg_type_descs_);
   return Status::OK();
 }
 
-Status AggFnEvaluator::Open(RuntimeState* state, FunctionContext* agg_fn_ctx) {
-  RETURN_IF_ERROR(Expr::Open(input_expr_ctxs_, state));
+Status AggFnEvaluator::Open(RuntimeState* state) {
+  if (opened_) return Status::OK();
+  opened_ = true;
+  RETURN_IF_ERROR(ScalarExprEvaluator::Open(input_evals_, state));
   // Now that we have opened all our input exprs, it is safe to evaluate any 
constant
   // values for the UDA's FunctionContext (we cannot evaluate exprs before 
calling Open()
   // on them).
-  vector<AnyVal*> constant_args(input_expr_ctxs_.size());
-  for (int i = 0; i < input_expr_ctxs_.size(); ++i) {
-    ExprContext* input_ctx = input_expr_ctxs_[i];
-    AnyVal* const_val;
-    RETURN_IF_ERROR(input_ctx->root()->GetConstVal(state, input_ctx, 
&const_val));
-    constant_args[i] = const_val;
+  vector<AnyVal*> constant_args(input_evals_.size(), nullptr);
+  for (int i = 0; i < input_evals_.size(); ++i) {
+    ScalarExprEvaluator* eval = input_evals_[i];
+    RETURN_IF_ERROR(eval->GetConstValue(state, *(agg_fn_.GetChild(i)),
+        &constant_args[i]));
   }
-  agg_fn_ctx->impl()->SetConstantArgs(move(constant_args));
+  agg_fn_ctx_->impl()->SetConstantArgs(move(constant_args));
+  return Status::OK();
+}
+
+Status AggFnEvaluator::Open(
+    const vector<AggFnEvaluator*>& evals, RuntimeState* state) {
+  for (AggFnEvaluator* eval : evals) RETURN_IF_ERROR(eval->Open(state));
   return Status::OK();
 }
 
 void AggFnEvaluator::Close(RuntimeState* state) {
-  Expr::Close(input_expr_ctxs_, state);
+  if (closed_) return;
+  closed_ = true;
+  if (!is_clone_) ScalarExprEvaluator::Close(input_evals_, state);
+  FreeLocalAllocations();
+  agg_fn_ctx_->impl()->Close();
+  agg_fn_ctx_.reset();
+  input_evals_.clear();
+}
 
-  if (cache_entry_ != NULL) {
-    LibCache::instance()->DecrementUseCount(cache_entry_);
-    cache_entry_ = NULL;
-  }
+void AggFnEvaluator::Close(
+    const vector<AggFnEvaluator*>& evals, RuntimeState* state) {
+  for (AggFnEvaluator* eval : evals) eval->Close(state);
 }
 
-inline void AggFnEvaluator::SetDstSlot(FunctionContext* ctx, const AnyVal* src,
-    const SlotDescriptor* dst_slot_desc, Tuple* dst) {
+void AggFnEvaluator::SetDstSlot(const AnyVal* src, const SlotDescriptor& 
dst_slot_desc,
+    Tuple* dst) {
   if (src->is_null) {
-    dst->SetNull(dst_slot_desc->null_indicator_offset());
+    dst->SetNull(dst_slot_desc.null_indicator_offset());
     return;
   }
 
-  dst->SetNotNull(dst_slot_desc->null_indicator_offset());
-  void* slot = dst->GetSlot(dst_slot_desc->tuple_offset());
-  switch (dst_slot_desc->type().type) {
+  dst->SetNotNull(dst_slot_desc.null_indicator_offset());
+  void* slot = dst->GetSlot(dst_slot_desc.tuple_offset());
+  switch (dst_slot_desc.type().type) {
     case TYPE_NULL:
       return;
     case TYPE_BOOLEAN:
@@ -278,7 +227,7 @@ inline void AggFnEvaluator::SetDstSlot(FunctionContext* 
ctx, const AnyVal* src,
       return;
     case TYPE_CHAR:
       if (slot != reinterpret_cast<const StringVal*>(src)->ptr) {
-        ctx->SetError("UDA should not set pointer of CHAR(N) intermediate");
+        agg_fn_ctx_->SetError("UDA should not set pointer of CHAR(N) 
intermediate");
       }
       return;
     case TYPE_TIMESTAMP:
@@ -286,7 +235,7 @@ inline void AggFnEvaluator::SetDstSlot(FunctionContext* 
ctx, const AnyVal* src,
           *reinterpret_cast<const TimestampVal*>(src));
       return;
     case TYPE_DECIMAL:
-      switch (dst_slot_desc->type().GetByteSize()) {
+      switch (dst_slot_desc.type().GetByteSize()) {
         case 4:
           *reinterpret_cast<int32_t*>(slot) =
               reinterpret_cast<const DecimalVal*>(src)->val4;
@@ -310,96 +259,101 @@ inline void AggFnEvaluator::SetDstSlot(FunctionContext* 
ctx, const AnyVal* src,
           break;
       }
     default:
-      DCHECK(false) << "NYI: " << dst_slot_desc->type();
+      DCHECK(false) << "NYI: " << dst_slot_desc.type();
   }
 }
 
 // This function would be replaced in codegen.
-void AggFnEvaluator::Init(FunctionContext* agg_fn_ctx, Tuple* dst) {
-  DCHECK(init_fn_ != NULL);
-  for (ExprContext* ctx : input_expr_ctxs_) DCHECK(ctx->opened());
+void AggFnEvaluator::Init(Tuple* dst) {
+  DCHECK(opened_);
+  DCHECK(agg_fn_.init_fn_ != nullptr);
+  for (ScalarExprEvaluator* input_eval : input_evals_) {
+    DCHECK(input_eval->opened());
+  }
 
-  if (intermediate_type().type == TYPE_CHAR) {
+  const ColumnType& type = intermediate_type();
+  const SlotDescriptor& slot_desc = intermediate_slot_desc();
+  if (type.type == TYPE_CHAR) {
     // For type char, we want to initialize the staging_intermediate_val_ with
     // a pointer into the tuple (the UDA should not be allocating it).
-    void* slot = dst->GetSlot(intermediate_slot_desc_->tuple_offset());
+    void* slot = dst->GetSlot(slot_desc.tuple_offset());
     StringVal* sv = reinterpret_cast<StringVal*>(staging_intermediate_val_);
-    sv->is_null = 
dst->IsNull(intermediate_slot_desc_->null_indicator_offset());
-    sv->ptr = reinterpret_cast<uint8_t*>(
-        StringValue::CharSlotToPtr(slot, intermediate_type()));
-    sv->len = intermediate_type().len;
+    sv->is_null = dst->IsNull(slot_desc.null_indicator_offset());
+    sv->ptr = reinterpret_cast<uint8_t*>(StringValue::CharSlotToPtr(slot, 
type));
+    sv->len = type.len;
   }
-  reinterpret_cast<InitFn>(init_fn_)(agg_fn_ctx, staging_intermediate_val_);
-  SetDstSlot(agg_fn_ctx, staging_intermediate_val_, intermediate_slot_desc_, 
dst);
-  agg_fn_ctx->impl()->set_num_updates(0);
-  agg_fn_ctx->impl()->set_num_removes(0);
+  reinterpret_cast<InitFn>(agg_fn_.init_fn_)(
+       agg_fn_ctx_.get(), staging_intermediate_val_);
+  SetDstSlot(staging_intermediate_val_, slot_desc, dst);
+  agg_fn_ctx_->impl()->set_num_updates(0);
+  agg_fn_ctx_->impl()->set_num_removes(0);
 }
 
-static void SetAnyVal(const SlotDescriptor* desc, Tuple* tuple, AnyVal* dst) {
-  bool is_null = tuple->IsNull(desc->null_indicator_offset());
-  void* slot = NULL;
-  if (!is_null) slot = tuple->GetSlot(desc->tuple_offset());
-  AnyValUtil::SetAnyVal(slot, desc->type(), dst);
+static void SetAnyVal(const SlotDescriptor& desc, Tuple* tuple, AnyVal* dst) {
+  bool is_null = tuple->IsNull(desc.null_indicator_offset());
+  void* slot = nullptr;
+  if (!is_null) slot = tuple->GetSlot(desc.tuple_offset());
+  AnyValUtil::SetAnyVal(slot, desc.type(), dst);
 }
 
-void AggFnEvaluator::Update(
-    FunctionContext* agg_fn_ctx, const TupleRow* row, Tuple* dst, void* fn) {
-  if (fn == NULL) return;
+void AggFnEvaluator::Update(const TupleRow* row, Tuple* dst, void* fn) {
+  if (fn == nullptr) return;
 
-  SetAnyVal(intermediate_slot_desc_, dst, staging_intermediate_val_);
+  const SlotDescriptor& slot_desc = intermediate_slot_desc();
+  SetAnyVal(slot_desc, dst, staging_intermediate_val_);
 
-  for (int i = 0; i < input_expr_ctxs_.size(); ++i) {
-    void* src_slot = input_expr_ctxs_[i]->GetValue(row);
-    AnyValUtil::SetAnyVal(
-        src_slot, input_expr_ctxs_[i]->root()->type(), staging_input_vals_[i]);
+  for (int i = 0; i < input_evals_.size(); ++i) {
+    void* src_slot = input_evals_[i]->GetValue(row);
+    DCHECK(&input_evals_[i]->root() == agg_fn_.GetChild(i));
+    AnyValUtil::SetAnyVal(src_slot, agg_fn_.GetChild(i)->type(), 
staging_input_vals_[i]);
   }
 
   // TODO: this part is not so good and not scalable. It can be replaced with
   // codegen but we can also consider leaving it for the first few cases for
   // debugging.
-  switch (input_expr_ctxs_.size()) {
+  switch (input_evals_.size()) {
     case 0:
-      reinterpret_cast<UpdateFn0>(fn)(agg_fn_ctx, staging_intermediate_val_);
+      reinterpret_cast<UpdateFn0>(fn)(agg_fn_ctx_.get(), 
staging_intermediate_val_);
       break;
     case 1:
-      reinterpret_cast<UpdateFn1>(fn)(agg_fn_ctx,
+      reinterpret_cast<UpdateFn1>(fn)(agg_fn_ctx_.get(),
           *staging_input_vals_[0], staging_intermediate_val_);
       break;
     case 2:
-      reinterpret_cast<UpdateFn2>(fn)(agg_fn_ctx,
+      reinterpret_cast<UpdateFn2>(fn)(agg_fn_ctx_.get(),
           *staging_input_vals_[0], *staging_input_vals_[1], 
staging_intermediate_val_);
       break;
     case 3:
-      reinterpret_cast<UpdateFn3>(fn)(agg_fn_ctx,
+      reinterpret_cast<UpdateFn3>(fn)(agg_fn_ctx_.get(),
           *staging_input_vals_[0], *staging_input_vals_[1],
           *staging_input_vals_[2], staging_intermediate_val_);
       break;
     case 4:
-      reinterpret_cast<UpdateFn4>(fn)(agg_fn_ctx,
+      reinterpret_cast<UpdateFn4>(fn)(agg_fn_ctx_.get(),
           *staging_input_vals_[0], *staging_input_vals_[1],
           *staging_input_vals_[2], *staging_input_vals_[3], 
staging_intermediate_val_);
       break;
     case 5:
-      reinterpret_cast<UpdateFn5>(fn)(agg_fn_ctx,
+      reinterpret_cast<UpdateFn5>(fn)(agg_fn_ctx_.get(),
           *staging_input_vals_[0], *staging_input_vals_[1],
           *staging_input_vals_[2], *staging_input_vals_[3],
           *staging_input_vals_[4], staging_intermediate_val_);
       break;
     case 6:
-      reinterpret_cast<UpdateFn6>(fn)(agg_fn_ctx,
+      reinterpret_cast<UpdateFn6>(fn)(agg_fn_ctx_.get(),
           *staging_input_vals_[0], *staging_input_vals_[1],
           *staging_input_vals_[2], *staging_input_vals_[3],
           *staging_input_vals_[4], *staging_input_vals_[5], 
staging_intermediate_val_);
       break;
     case 7:
-      reinterpret_cast<UpdateFn7>(fn)(agg_fn_ctx,
+      reinterpret_cast<UpdateFn7>(fn)(agg_fn_ctx_.get(),
           *staging_input_vals_[0], *staging_input_vals_[1],
           *staging_input_vals_[2], *staging_input_vals_[3],
           *staging_input_vals_[4], *staging_input_vals_[5],
           *staging_input_vals_[6], staging_intermediate_val_);
       break;
     case 8:
-      reinterpret_cast<UpdateFn8>(fn)(agg_fn_ctx,
+      reinterpret_cast<UpdateFn8>(fn)(agg_fn_ctx_.get(),
           *staging_input_vals_[0], *staging_input_vals_[1],
           *staging_input_vals_[2], *staging_input_vals_[3],
           *staging_input_vals_[4], *staging_input_vals_[5],
@@ -409,101 +363,113 @@ void AggFnEvaluator::Update(
     default:
       DCHECK(false) << "NYI";
   }
-  SetDstSlot(agg_fn_ctx, staging_intermediate_val_, intermediate_slot_desc_, 
dst);
+  SetDstSlot(staging_intermediate_val_, slot_desc, dst);
 }
 
-void AggFnEvaluator::Merge(FunctionContext* agg_fn_ctx, Tuple* src, Tuple* 
dst) {
-  DCHECK(merge_fn_ != NULL);
+void AggFnEvaluator::Merge(Tuple* src, Tuple* dst) {
+  DCHECK(agg_fn_.merge_fn_ != nullptr);
 
-  SetAnyVal(intermediate_slot_desc_, dst, staging_intermediate_val_);
-  SetAnyVal(intermediate_slot_desc_, src, staging_merge_input_val_);
+  const SlotDescriptor& slot_desc = intermediate_slot_desc();
+  SetAnyVal(slot_desc, dst, staging_intermediate_val_);
+  SetAnyVal(slot_desc, src, staging_merge_input_val_);
 
   // The merge fn always takes one input argument.
-  reinterpret_cast<UpdateFn1>(merge_fn_)(agg_fn_ctx,
+  reinterpret_cast<UpdateFn1>(agg_fn_.merge_fn_)(agg_fn_ctx_.get(),
       *staging_merge_input_val_, staging_intermediate_val_);
-  SetDstSlot(agg_fn_ctx, staging_intermediate_val_, intermediate_slot_desc_, 
dst);
+  SetDstSlot(staging_intermediate_val_, slot_desc, dst);
 }
 
-void AggFnEvaluator::SerializeOrFinalize(FunctionContext* agg_fn_ctx, Tuple* 
src,
-    const SlotDescriptor* dst_slot_desc, Tuple* dst, void* fn) {
+void AggFnEvaluator::SerializeOrFinalize(Tuple* src,
+    const SlotDescriptor& dst_slot_desc, Tuple* dst, void* fn) {
   // No fn was given and the src and dst are identical. Nothing to be done.
-  if (fn == NULL && src == dst) return;
+  if (fn == nullptr && src == dst) return;
   // src != dst means we are performing a Finalize(), so even if fn == null we
   // still must copy the value of the src slot into dst.
 
-  bool src_slot_null = 
src->IsNull(intermediate_slot_desc_->null_indicator_offset());
-  void* src_slot = NULL;
-  if (!src_slot_null) src_slot = 
src->GetSlot(intermediate_slot_desc_->tuple_offset());
+  const SlotDescriptor& slot_desc = intermediate_slot_desc();
+  bool src_slot_null = src->IsNull(slot_desc.null_indicator_offset());
+  void* src_slot = nullptr;
+  if (!src_slot_null) src_slot = src->GetSlot(slot_desc.tuple_offset());
 
   // No fn was given but the src and dst tuples are different (doing a 
Finalize()).
   // Just copy the src slot into the dst tuple.
-  if (fn == NULL) {
-    DCHECK_EQ(intermediate_type(), dst_slot_desc->type());
-    RawValue::Write(src_slot, dst, dst_slot_desc, NULL);
+  if (fn == nullptr) {
+    DCHECK_EQ(intermediate_type(), dst_slot_desc.type());
+    RawValue::Write(src_slot, dst, &dst_slot_desc, nullptr);
     return;
   }
 
   AnyValUtil::SetAnyVal(src_slot, intermediate_type(), 
staging_intermediate_val_);
-  switch (dst_slot_desc->type().type) {
+  switch (dst_slot_desc.type().type) {
     case TYPE_BOOLEAN: {
       typedef BooleanVal(*Fn)(FunctionContext*, AnyVal*);
-      BooleanVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, 
staging_intermediate_val_);
-      SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
+      BooleanVal v = reinterpret_cast<Fn>(fn)(
+          agg_fn_ctx_.get(), staging_intermediate_val_);
+      SetDstSlot(&v, dst_slot_desc, dst);
       break;
     }
     case TYPE_TINYINT: {
       typedef TinyIntVal(*Fn)(FunctionContext*, AnyVal*);
-      TinyIntVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, 
staging_intermediate_val_);
-      SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
+      TinyIntVal v = reinterpret_cast<Fn>(fn)(
+          agg_fn_ctx_.get(), staging_intermediate_val_);
+      SetDstSlot(&v, dst_slot_desc, dst);
       break;
     }
     case TYPE_SMALLINT: {
       typedef SmallIntVal(*Fn)(FunctionContext*, AnyVal*);
-      SmallIntVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, 
staging_intermediate_val_);
-      SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
+      SmallIntVal v = reinterpret_cast<Fn>(fn)(
+          agg_fn_ctx_.get(), staging_intermediate_val_);
+      SetDstSlot(&v, dst_slot_desc, dst);
       break;
     }
     case TYPE_INT: {
       typedef IntVal(*Fn)(FunctionContext*, AnyVal*);
-      IntVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, 
staging_intermediate_val_);
-      SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
+      IntVal v = reinterpret_cast<Fn>(fn)(
+          agg_fn_ctx_.get(), staging_intermediate_val_);
+      SetDstSlot(&v, dst_slot_desc, dst);
       break;
     }
     case TYPE_BIGINT: {
       typedef BigIntVal(*Fn)(FunctionContext*, AnyVal*);
-      BigIntVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, 
staging_intermediate_val_);
-      SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
+      BigIntVal v = reinterpret_cast<Fn>(fn)(
+          agg_fn_ctx_.get(), staging_intermediate_val_);
+      SetDstSlot(&v, dst_slot_desc, dst);
       break;
     }
     case TYPE_FLOAT: {
       typedef FloatVal(*Fn)(FunctionContext*, AnyVal*);
-      FloatVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, 
staging_intermediate_val_);
-      SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
+      FloatVal v = reinterpret_cast<Fn>(fn)(
+          agg_fn_ctx_.get(), staging_intermediate_val_);
+      SetDstSlot(&v, dst_slot_desc, dst);
       break;
     }
     case TYPE_DOUBLE: {
       typedef DoubleVal(*Fn)(FunctionContext*, AnyVal*);
-      DoubleVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, 
staging_intermediate_val_);
-      SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
+      DoubleVal v = reinterpret_cast<Fn>(fn)(
+          agg_fn_ctx_.get(), staging_intermediate_val_);
+      SetDstSlot(&v, dst_slot_desc, dst);
       break;
     }
     case TYPE_STRING:
     case TYPE_VARCHAR: {
       typedef StringVal(*Fn)(FunctionContext*, AnyVal*);
-      StringVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, 
staging_intermediate_val_);
-      SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
+      StringVal v = reinterpret_cast<Fn>(fn)(
+          agg_fn_ctx_.get(), staging_intermediate_val_);
+      SetDstSlot(&v, dst_slot_desc, dst);
       break;
     }
     case TYPE_DECIMAL: {
       typedef DecimalVal(*Fn)(FunctionContext*, AnyVal*);
-      DecimalVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, 
staging_intermediate_val_);
-      SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
+      DecimalVal v = reinterpret_cast<Fn>(fn)(
+          agg_fn_ctx_.get(), staging_intermediate_val_);
+      SetDstSlot(&v, dst_slot_desc, dst);
       break;
     }
     case TYPE_TIMESTAMP: {
       typedef TimestampVal(*Fn)(FunctionContext*, AnyVal*);
-      TimestampVal v = reinterpret_cast<Fn>(fn)(agg_fn_ctx, 
staging_intermediate_val_);
-      SetDstSlot(agg_fn_ctx, &v, dst_slot_desc, dst);
+      TimestampVal v = reinterpret_cast<Fn>(fn)(
+          agg_fn_ctx_.get(), staging_intermediate_val_);
+      SetDstSlot(&v, dst_slot_desc, dst);
       break;
     }
     default:
@@ -511,56 +477,34 @@ void AggFnEvaluator::SerializeOrFinalize(FunctionContext* 
agg_fn_ctx, Tuple* src
   }
 }
 
-/// Gets the update or merge function for this UDA.
-Status AggFnEvaluator::GetUpdateOrMergeFunction(LlvmCodeGen* codegen, 
Function** uda_fn) {
-  const string& symbol =
-      is_merge_ ? fn_.aggregate_fn.merge_fn_symbol : 
fn_.aggregate_fn.update_fn_symbol;
-  vector<ColumnType> fn_arg_types;
-  for (ExprContext* input_expr_ctx : input_expr_ctxs_) {
-    fn_arg_types.push_back(input_expr_ctx->root()->type());
-  }
-  // The intermediate value is passed as the last argument.
-  fn_arg_types.push_back(intermediate_type());
-  RETURN_IF_ERROR(codegen->LoadFunction(fn_, symbol, NULL, fn_arg_types,
-      fn_arg_types.size(), false, uda_fn, &cache_entry_));
-
-  // Inline constants into the function body (if there is an IR body).
-  if (!(*uda_fn)->isDeclaration()) {
-    // TODO: IMPALA-4785: we should also replace references to 
GetIntermediateType()
-    // with constants.
-    codegen->InlineConstFnAttrs(GetOutputTypeDesc(), arg_type_descs_, *uda_fn);
-    *uda_fn = codegen->FinalizeFunction(*uda_fn);
-    if (*uda_fn == NULL) {
-      return Status(TErrorCode::UDF_VERIFY_FAILED, symbol, fn_.hdfs_location);
-    }
-  }
-  return Status::OK();
-}
-
-FunctionContext::TypeDesc AggFnEvaluator::GetIntermediateTypeDesc() const {
-  return AnyValUtil::ColumnTypeToTypeDesc(intermediate_slot_desc_->type());
+void AggFnEvaluator::ShallowClone(ObjectPool* pool, MemPool* mem_pool,
+    AggFnEvaluator** cloned_eval) const {
+  DCHECK(opened_);
+  *cloned_eval = pool->Add(new AggFnEvaluator(agg_fn_, mem_pool, true));
+  (*cloned_eval)->agg_fn_ctx_.reset(agg_fn_ctx_->impl()->Clone(mem_pool));
+  DCHECK_EQ((*cloned_eval)->input_evals_.size(), 0);
+  (*cloned_eval)->input_evals_ = input_evals_;
+  (*cloned_eval)->staging_input_vals_ = staging_input_vals_;
+  (*cloned_eval)->staging_intermediate_val_ = staging_intermediate_val_;
+  (*cloned_eval)->staging_merge_input_val_ = staging_merge_input_val_;
+  (*cloned_eval)->opened_ = true;
 }
 
-FunctionContext::TypeDesc AggFnEvaluator::GetOutputTypeDesc() const {
-  return AnyValUtil::ColumnTypeToTypeDesc(output_slot_desc_->type());
+void AggFnEvaluator::ShallowClone(ObjectPool* pool, MemPool* mem_pool,
+    const vector<AggFnEvaluator*>& evals,
+    vector<AggFnEvaluator*>* cloned_evals) {
+  for (const AggFnEvaluator* eval : evals) {
+    AggFnEvaluator* cloned_eval;
+    eval->ShallowClone(pool, mem_pool, &cloned_eval);
+    cloned_evals->push_back(cloned_eval);
+  }
 }
 
-string AggFnEvaluator::DebugString(const vector<AggFnEvaluator*>& exprs) {
-  stringstream out;
-  out << "[";
-  for (int i = 0; i < exprs.size(); ++i) {
-    out << (i == 0 ? "" : " ") << exprs[i]->DebugString();
-  }
-  out << "]";
-  return out.str();
+void AggFnEvaluator::FreeLocalAllocations() {
+  ScalarExprEvaluator::FreeLocalAllocations(input_evals_);
+  agg_fn_ctx_->impl()->FreeLocalAllocations();
 }
 
-string AggFnEvaluator::DebugString() const {
-  stringstream out;
-  out << "AggFnEvaluator(op=" << agg_op_;
-  for (int i = 0; i < input_expr_ctxs_.size(); ++i) {
-    out << " " << input_expr_ctxs_[i]->root()->DebugString() << ")";
-  }
-  out << ")";
-  return out.str();
+void AggFnEvaluator::FreeLocalAllocations(const vector<AggFnEvaluator*>& 
evals) {
+  for (AggFnEvaluator* eval : evals) eval->FreeLocalAllocations();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/agg-fn-evaluator.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/agg-fn-evaluator.h b/be/src/exprs/agg-fn-evaluator.h
index 712bf40..2ce968c 100644
--- a/be/src/exprs/agg-fn-evaluator.h
+++ b/be/src/exprs/agg-fn-evaluator.h
@@ -23,6 +23,7 @@
 
 #include <boost/scoped_ptr.hpp>
 #include "common/status.h"
+#include "exprs/agg-fn.h"
 #include "runtime/descriptors.h"
 #include "runtime/lib-cache.h"
 #include "runtime/tuple-row.h"
@@ -34,13 +35,8 @@
 #include "gen-cpp/PlanNodes_types.h"
 #include "gen-cpp/Types_types.h"
 
-using namespace impala_udf;
-
 namespace impala {
 
-class AggregationNode;
-class Expr;
-class ExprContext;
 class MemPool;
 class MemTracker;
 class ObjectPool;
@@ -51,98 +47,94 @@ class Tuple;
 class TupleRow;
 class TExprNode;
 
-/// This class evaluates aggregate functions. Aggregate functions can either be
-/// builtins or external UDAs. For both of types types, they can either use 
codegen
-/// or not.
-//
-/// This class provides an interface that's 1:1 with the UDA interface and 
serves
-/// as glue code between the TupleRow/Tuple signature used by the 
AggregationNode
-/// and the AnyVal signature of the UDA interface. It handles evaluating input
-/// slots from TupleRows and aggregating the result to the result tuple.
-//
-/// This class is not threadsafe. However, it can be used for multiple 
interleaved
-/// evaluations of the aggregation function by using multiple FunctionContexts.
+/// AggFnEvaluator is the interface for evaluating aggregate functions during 
execution.
+///
+/// AggFnEvaluator contains runtime state and implements wrapper functions 
which convert
+/// the input TupleRow into AnyVal format expected by UDAF functions defined 
in AggFn.
+/// It also evaluates TupleRow against input expressions, stores the results 
in staging
+/// input values which are passed to Update() function to update the 
intermediate value
+/// and handles the merging of intermediate values in the merge phases of 
execution.
+///
+/// This class is not threadsafe. An evaluator can be cloned to isolate 
resource
+/// consumption per partition in an aggregation node.
+///
 class AggFnEvaluator {
  public:
-  /// TODO: The aggregation node has custom codegen paths for a few of the 
builtins.
-  /// That logic needs to be removed. For now, add some enums for those 
builtins.
-  enum AggregationOp {
-    COUNT,
-    MIN,
-    MAX,
-    SUM,
-    AVG,
-    NDV,
-    OTHER,
-  };
-
-  /// Creates an AggFnEvaluator object from desc. The object is added to 'pool'
-  /// and returned in *result. This constructs the input Expr trees for
-  /// this aggregate function as specified in desc. The result is returned in
-  /// *result.
-  static Status Create(ObjectPool* pool, const TExpr& desc, AggFnEvaluator** 
result);
-
-  /// Creates an AggFnEvaluator object from desc. If is_analytic_fn, the 
evaluator is
-  /// prepared for analytic function evaluation.
-  /// TODO: Avoid parameter for analytic fns, should this be added to 
TAggregateExpr?
-  static Status Create(ObjectPool* pool, const TExpr& desc, bool 
is_analytic_fn,
-      AggFnEvaluator** result);
-
-  /// Initializes the agg expr. 'desc' must be the row descriptor for the 
input TupleRow.
-  /// It is used to get the input values in the Update() and Merge() functions.
-  /// 'intermediate_slot_desc' is the slot into which this evaluator should 
write the
-  /// results of Update()/Merge()/Serialize().
-  /// 'output_slot_desc' is the slot into which this evaluator should write 
the results
-  /// of Finalize()
-  /// 'agg_fn_ctx' will be initialized for the agg function using 
'agg_fn_pool'. Caller
-  /// is responsible for closing and deleting 'agg_fn_ctx'.
-  Status Prepare(RuntimeState* state, const RowDescriptor& desc,
-      const SlotDescriptor* intermediate_slot_desc,
-      const SlotDescriptor* output_slot_desc,
-      MemPool* agg_fn_pool, FunctionContext** agg_fn_ctx);
+  /// Creates an AggFnEvaluator object from the aggregate expression 'agg_fn'.
+  /// The evaluator is added to 'pool' and returned in 'eval'. This will also
+  /// create a single evaluator for each input expression. All allocations 
will come
+  /// from 'mem_pool'. Note that it's the responsibility to call Close() all 
evaluators
+  /// even if this function returns error status on initialization failure.
+  static Status Create(const AggFn& agg_fn, RuntimeState* state, ObjectPool* 
pool,
+      MemPool* mem_pool, AggFnEvaluator** eval) WARN_UNUSED_RESULT;
+
+  /// Convenience functions for creating evaluators for multiple aggregate 
functions.
+  static Status Create(const std::vector<AggFn*>& agg_fns, RuntimeState* state,
+      ObjectPool* pool, MemPool* mem_pool, std::vector<AggFnEvaluator*>* evals)
+      WARN_UNUSED_RESULT;
 
   ~AggFnEvaluator();
 
-  /// 'agg_fn_ctx' may be cloned after calling Open(). Note that closing all
-  /// FunctionContexts, including the original one returned by Prepare(), is 
the
-  /// responsibility of the caller.
-  Status Open(RuntimeState* state, FunctionContext* agg_fn_ctx);
-
+  /// Initializes the evaluator by calling Open() on all the input 
expressions' evaluators
+  /// and caches all constant input arguments.
+  /// TODO: Move the evaluation of constant input arguments to AggFn setup.
+  Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
+
+  /// Convenience functions for opening multiple AggFnEvaluators.
+  static Status Open(const std::vector<AggFnEvaluator*>& evals,
+      RuntimeState* state) WARN_UNUSED_RESULT;
+
+  /// Used by PartitionedAggregation node to initialize one evaluator per 
partition.
+  /// Avoid the overhead of re-initializing an evaluator (e.g. calling 
GetConstVal()
+  /// on the input expressions). Cannot be called until after Open() has been 
called.
+  /// 'cloned_eval' is a shallow copy of this evaluator: all input values, 
staging
+  /// intermediate values and merge values are shared with the original 
evaluator. Only
+  /// the FunctionContext 'agg_fn_ctx' is cloned for resource isolation per 
partition.
+  /// So, it's not safe to use cloned evaluators concurrently.
+  void ShallowClone(
+      ObjectPool* pool, MemPool* mem_pool, AggFnEvaluator** cloned_eval) const;
+
+  /// Convenience function for cloning multiple evaluators. The newly cloned 
evaluators
+  /// are appended to 'cloned_evals'.
+  static void ShallowClone(ObjectPool* pool, MemPool* mem_pool,
+      const std::vector<AggFnEvaluator*>& evals,
+      std::vector<AggFnEvaluator*>* cloned_evals);
+
+  /// Free resources owned by the evaluator.
   void Close(RuntimeState* state);
+  static void Close(const std::vector<AggFnEvaluator*>& evals, RuntimeState* 
state);
 
-  const ColumnType& intermediate_type() const { return 
intermediate_slot_desc_->type(); }
-  bool is_merge() const { return is_merge_; }
-  AggregationOp agg_op() const { return agg_op_; }
-  const std::vector<ExprContext*>& input_expr_ctxs() const { return 
input_expr_ctxs_; }
-  bool is_count_star() const { return agg_op_ == COUNT && 
input_expr_ctxs_.empty(); }
-  bool is_builtin() const { return fn_.binary_type == 
TFunctionBinaryType::BUILTIN; }
-  bool SupportsRemove() const { return remove_fn_ != NULL; }
-  bool SupportsSerialize() const { return serialize_fn_ != NULL; }
-  const std::string& fn_name() const { return fn_.name.function_name; }
-  const SlotDescriptor* output_slot_desc() const { return output_slot_desc_; }
-
-  static std::string DebugString(const std::vector<AggFnEvaluator*>& exprs);
-  std::string DebugString() const;
+  const AggFn& agg_fn() const { return agg_fn_; }
+
+  FunctionContext* IR_ALWAYS_INLINE agg_fn_ctx() const;
 
-  /// Functions for different phases of the aggregation.
-  void Init(FunctionContext* agg_fn_ctx, Tuple* dst);
+  ScalarExprEvaluator* const* IR_ALWAYS_INLINE input_evals() const;
+
+  /// Call the initialization function of the AggFn. May update 'dst'.
+  void Init(Tuple* dst);
 
   /// Updates the intermediate state dst based on adding the input src row. 
This can be
-  /// called either to drive the UDA's Update() or Merge() function depending 
on
-  /// is_merge_. That is, from the caller, it doesn't mater.
-  void Add(FunctionContext* agg_fn_ctx, const TupleRow* src, Tuple* dst);
+  /// called either to drive the UDA's Update() or Merge() function, depending 
on whether
+  /// the AggFn is a merging aggregation.
+  void Add(const TupleRow* src, Tuple* dst);
 
-  /// Updates the intermediate state dst to remove the input src row, i.e. 
undoes
+  /// Updates the intermediate state dst to remove the input src row, i.e. undo
   /// Add(src, dst). Only used internally for analytic fn builtins.
-  void Remove(FunctionContext* agg_fn_ctx, const TupleRow* src, Tuple* dst);
+  void Remove(const TupleRow* src, Tuple* dst);
 
-  /// Explicitly does a merge, even if this evalutor is not marked as merging.
+  /// Explicitly does a merge, even if this evaluator is not marked as merging.
   /// This is used by the partitioned agg node when it needs to merge spill 
results.
   /// In the non-spilling case, this node would normally not merge.
-  void Merge(FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst);
+  void Merge(Tuple* src, Tuple* dst);
+
+  /// Flattens any intermediate values containing pointers, and frees any 
memory
+  /// allocated during the init, update and merge phases.
+  void Serialize(Tuple* dst);
 
-  void Serialize(FunctionContext* agg_fn_ctx, Tuple* dst);
-  void Finalize(FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst);
+  /// Does one final transformation of the aggregated value in 'agg_val' and 
stores the
+  /// result in 'output_val'. Also frees the resources allocated during init, 
update and
+  /// merge phases.
+  void Finalize(Tuple* agg_val, Tuple* output_val);
 
   /// Puts the finalized value from Tuple* src in Tuple* dst just as 
Finalize() does.
   /// However, unlike Finalize(), GetValue() does not clean up state in src.
@@ -150,169 +142,155 @@ class AggFnEvaluator {
   /// analytic fn builtins. Note that StringVal result is from local 
allocation (which
   /// will be freed in the next QueryMaintenance()) so it needs to be copied 
out if it
   /// needs to survive beyond QueryMaintenance() (e.g. if 'dst' lives in a row 
batch).
-  void GetValue(FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst);
+  void GetValue(Tuple* src, Tuple* dst);
 
   /// Helper functions for calling the above functions on many evaluators.
-  static void Init(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst);
-  static void Add(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, 
Tuple* dst);
-  static void Remove(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, 
Tuple* dst);
-  static void Serialize(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst);
-  static void GetValue(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst);
-  static void Finalize(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst);
-
-  /// Gets the codegened update or merge function for this aggregate function.
-  Status GetUpdateOrMergeFunction(LlvmCodeGen* codegen, llvm::Function** 
uda_fn);
+  static void Init(const std::vector<AggFnEvaluator*>& evals, Tuple* dst);
+  static void Add(const std::vector<AggFnEvaluator*>& evals, const TupleRow* 
src,
+      Tuple* dst);
+  static void Remove(const std::vector<AggFnEvaluator*>& evals,
+      const TupleRow* src, Tuple* dst);
+  static void Serialize(const std::vector<AggFnEvaluator*>& evals,
+      Tuple* dst);
+  static void GetValue(const std::vector<AggFnEvaluator*>& evals, Tuple* src,
+      Tuple* dst);
+  static void Finalize(const std::vector<AggFnEvaluator*>& evals, Tuple* src,
+      Tuple* dst);
+
+  /// Free local allocations made in UDA functions and input arguments' evals.
+  void FreeLocalAllocations();
+  static void FreeLocalAllocations(const std::vector<AggFnEvaluator*>& evals);
+
+  std::string DebugString() const;
+  static std::string DebugString(const std::vector<AggFnEvaluator*>& evals);
+
+  static const char* LLVM_CLASS_NAME;
 
  private:
-  const TFunction fn_;
-  /// Indicates whether to Update() or Merge()
-  const bool is_merge_;
-  /// Indicates which functions must be loaded.
-  const bool is_analytic_fn_;
+  /// True if the evaluator has been initialized.
+  bool opened_ = false;
 
-  /// Slot into which Update()/Merge()/Serialize() write their result. Not 
owned.
-  const SlotDescriptor* intermediate_slot_desc_;
+  /// True if the evaluator has been closed.
+  bool closed_ = false;
 
-  /// Slot into which Finalize() results are written. Not owned. Identical to
-  /// intermediate_slot_desc_ if this agg fn has the same intermediate and 
output type.
-  const SlotDescriptor* output_slot_desc_;
+  /// True if this evaluator is created from a ShallowClone() call.
+  const bool is_clone_;
 
-  /// Expression contexts for this AggFnEvaluator. Empty if there is no
-  /// expression (e.g. count(*)).
-  std::vector<ExprContext*> input_expr_ctxs_;
+  const AggFn& agg_fn_;
 
-  /// The types of the arguments to the aggregate function.
-  const std::vector<FunctionContext::TypeDesc> arg_type_descs_;
+  /// Pointer to the MemPool which all allocations come from.
+  /// Owned by the exec node which owns this evaluator.
+  MemPool* mem_pool_ = nullptr;
 
-  /// The enum for some of the builtins that still require special cased logic.
-  AggregationOp agg_op_;
+  /// This contains runtime state such as constant input arguments to the 
aggregate
+  /// functions and a FreePool from which the intermediate values are 
allocated.
+  /// Owned by this evaluator.
+  boost::scoped_ptr<FunctionContext> agg_fn_ctx_;
 
-  /// Created to a subclass of AnyVal for type(). We use this to convert values
-  /// from the UDA interface to the Expr interface.
-  /// These objects are allocated in the runtime state's object pool.
-  /// TODO: this is awful, remove this when exprs are updated.
-  std::vector<impala_udf::AnyVal*> staging_input_vals_;
-  impala_udf::AnyVal* staging_intermediate_val_;
-  impala_udf::AnyVal* staging_merge_input_val_;
+  /// Evaluators for input expressions for this aggregate function.
+  /// Empty if there is no input expression (e.g. count(*)).
+  std::vector<ScalarExprEvaluator*> input_evals_;
 
-  /// Cache entry for the library containing the function ptrs.
-  LibCacheEntry* cache_entry_;
+  /// Staging input values used by the interpreted Update() / Merge() paths.
+  /// It stores the evaluation results of input expressions to be passed to the
+  /// Update() / Merge() function.
+  std::vector<impala_udf::AnyVal*> staging_input_vals_;
 
-  /// Function ptrs for the different phases of the aggregate function.
-  void* init_fn_;
-  void* update_fn_;
-  void* remove_fn_;
-  void* merge_fn_;
-  void* serialize_fn_;
-  void* get_value_fn_;
-  void* finalize_fn_;
+  /// Staging intermediate and merged values used in the interpreted
+  /// Update() / Merge() paths.
+  impala_udf::AnyVal* staging_intermediate_val_ = nullptr;
+  impala_udf::AnyVal* staging_merge_input_val_ = nullptr;
 
   /// Use Create() instead.
-  AggFnEvaluator(const TExprNode& desc, bool is_analytic_fn);
+  AggFnEvaluator(const AggFn& agg_fn, MemPool* mem_pool, bool is_clone);
 
   /// Return the intermediate type of the aggregate function.
-  FunctionContext::TypeDesc GetIntermediateTypeDesc() const;
-
-  /// Return the output type of the aggregate function.
-  FunctionContext::TypeDesc GetOutputTypeDesc() const;
-
-  /// TODO: these functions below are not extensible and we need to use 
codegen to
-  /// generate the calls into the UDA functions (like for UDFs).
-  /// Remove these functions when this is supported.
-
-  /// Sets up the arguments to call fn. This converts from the agg-expr 
signature,
-  /// taking TupleRow to the UDA signature taking AnvVals by populating the 
staging
-  /// AnyVals.
-  /// fn must be a function that implement's the UDA Update() signature.
-  void Update(FunctionContext* agg_fn_ctx, const TupleRow* row, Tuple* dst, 
void* fn);
+  inline const SlotDescriptor& intermediate_slot_desc() const;
+  inline const ColumnType& intermediate_type() const;
+
+  /// The interpreted path for the UDA's Update() function. It sets up the 
arguments to
+  /// call 'fn' is either the 'update_fn_' or 'merge_fn_' of agg_fn_, 
depending on whether
+  /// agg_fn_ is a merging aggregation. This converts from the agg-expr 
signature, taking
+  /// TupleRow to the UDA signature taking AnyVals by evaluating any input 
expressions
+  /// and populating the staging input values.
+  ///
+  /// Note that this function may be superseded by the codegend Update() IR 
function
+  /// generated by AggFn::CodegenUpdateOrMergeFunction() when codegen is 
enabled.
+  void Update(const TupleRow* row, Tuple* dst, void* fn);
 
   /// Sets up the arguments to call 'fn'. This converts from the agg-expr 
signature,
-  /// taking TupleRow to the UDA signature taking AnvVals. Writes the 
serialize/finalize
+  /// taking TupleRow to the UDA signature taking AnyVals. Writes the 
serialize/finalize
   /// result to the given destination slot/tuple. 'fn' can be NULL to indicate 
the src
   /// value should simply be written into the destination. Note that StringVal 
result is
   /// from local allocation (which will be freed in the next 
QueryMaintenance()) so it
   /// needs to be copied out if it needs to survive beyond QueryMaintenance() 
(e.g. if
   /// 'dst' lives in a row batch).
-  void SerializeOrFinalize(FunctionContext* agg_fn_ctx, Tuple* src,
-      const SlotDescriptor* dst_slot_desc, Tuple* dst, void* fn);
+  void SerializeOrFinalize(Tuple* src, const SlotDescriptor& dst_slot_desc,
+      Tuple* dst, void* fn);
 
   /// Writes the result in src into dst pointed to by dst_slot_desc
-  void SetDstSlot(FunctionContext* ctx, const impala_udf::AnyVal* src,
-      const SlotDescriptor* dst_slot_desc, Tuple* dst);
+  inline void SetDstSlot(
+      const impala_udf::AnyVal* src, const SlotDescriptor& dst_slot_desc, 
Tuple* dst);
 };
 
-inline void AggFnEvaluator::Add(
-    FunctionContext* agg_fn_ctx, const TupleRow* row, Tuple* dst) {
-  agg_fn_ctx->impl()->IncrementNumUpdates();
-  Update(agg_fn_ctx, row, dst, is_merge() ? merge_fn_ : update_fn_);
+inline void AggFnEvaluator::Add(const TupleRow* row, Tuple* dst) {
+  agg_fn_ctx_->impl()->IncrementNumUpdates();
+  Update(row, dst, agg_fn_.merge_or_update_fn());
 }
-inline void AggFnEvaluator::Remove(
-    FunctionContext* agg_fn_ctx, const TupleRow* row, Tuple* dst) {
-  agg_fn_ctx->impl()->IncrementNumRemoves();
-  Update(agg_fn_ctx, row, dst, remove_fn_);
+
+inline void AggFnEvaluator::Remove(const TupleRow* row, Tuple* dst) {
+  agg_fn_ctx_->impl()->IncrementNumRemoves();
+  Update(row, dst, agg_fn_.remove_fn());
 }
-inline void AggFnEvaluator::Serialize(
-    FunctionContext* agg_fn_ctx, Tuple* tuple) {
-  SerializeOrFinalize(agg_fn_ctx, tuple, intermediate_slot_desc_, tuple, 
serialize_fn_);
+
+inline void AggFnEvaluator::Serialize(Tuple* tuple) {
+  SerializeOrFinalize(tuple, agg_fn_.intermediate_slot_desc(), tuple,
+      agg_fn_.serialize_fn());
 }
-inline void AggFnEvaluator::Finalize(
-    FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst) {
-  SerializeOrFinalize(agg_fn_ctx, src, output_slot_desc_, dst, finalize_fn_);
+
+inline void AggFnEvaluator::Finalize(Tuple* agg_val, Tuple* output_val) {
+  SerializeOrFinalize(agg_val, agg_fn_.output_slot_desc(), output_val,
+      agg_fn_.finalize_fn());
 }
-inline void AggFnEvaluator::GetValue(
-    FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst) {
-  SerializeOrFinalize(agg_fn_ctx, src, output_slot_desc_, dst, get_value_fn_);
+
+inline void AggFnEvaluator::GetValue(Tuple* src, Tuple* dst) {
+  SerializeOrFinalize(src, agg_fn_.output_slot_desc(), dst,
+      agg_fn_.get_value_fn());
 }
 
-inline void AggFnEvaluator::Init(const std::vector<AggFnEvaluator*>& 
evaluators,
-    const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst) {
-  DCHECK_EQ(evaluators.size(), fn_ctxs.size());
-  for (int i = 0; i < evaluators.size(); ++i) {
-    evaluators[i]->Init(fn_ctxs[i], dst);
-  }
+inline void AggFnEvaluator::Init(const std::vector<AggFnEvaluator*>& evals, 
Tuple* dst) {
+  for (int i = 0; i < evals.size(); ++i) evals[i]->Init(dst);
 }
-inline void AggFnEvaluator::Add(const std::vector<AggFnEvaluator*>& evaluators,
-    const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, Tuple* 
dst) {
-  DCHECK_EQ(evaluators.size(), fn_ctxs.size());
-  for (int i = 0; i < evaluators.size(); ++i) {
-    evaluators[i]->Add(fn_ctxs[i], src, dst);
-  }
+
+inline void AggFnEvaluator::Add(const std::vector<AggFnEvaluator*>& evals,
+    const TupleRow* src, Tuple* dst) {
+  for (int i = 0; i < evals.size(); ++i) evals[i]->Add(src, dst);
 }
-inline void AggFnEvaluator::Remove(const std::vector<AggFnEvaluator*>& 
evaluators,
-    const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, Tuple* 
dst) {
-  DCHECK_EQ(evaluators.size(), fn_ctxs.size());
-  for (int i = 0; i < evaluators.size(); ++i) {
-    evaluators[i]->Remove(fn_ctxs[i], src, dst);
-  }
+
+inline void AggFnEvaluator::Remove(const std::vector<AggFnEvaluator*>& evals,
+    const TupleRow* src, Tuple* dst) {
+  for (int i = 0; i < evals.size(); ++i) evals[i]->Remove(src, dst);
 }
-inline void AggFnEvaluator::Serialize(const std::vector<AggFnEvaluator*>& 
evaluators,
-    const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst) {
-  DCHECK_EQ(evaluators.size(), fn_ctxs.size());
-  for (int i = 0; i < evaluators.size(); ++i) {
-    evaluators[i]->Serialize(fn_ctxs[i], dst);
-  }
+
+inline void AggFnEvaluator::Serialize(const std::vector<AggFnEvaluator*>& 
evals,
+    Tuple* dst) {
+  for (int i = 0; i < evals.size(); ++i) evals[i]->Serialize(dst);
 }
-inline void AggFnEvaluator::GetValue(const std::vector<AggFnEvaluator*>& 
evaluators,
-    const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst) {
-  DCHECK_EQ(evaluators.size(), fn_ctxs.size());
-  for (int i = 0; i < evaluators.size(); ++i) {
-    evaluators[i]->GetValue(fn_ctxs[i], src, dst);
-  }
+
+inline void AggFnEvaluator::GetValue(const std::vector<AggFnEvaluator*>& evals,
+    Tuple* src, Tuple* dst) {
+  for (int i = 0; i < evals.size(); ++i) evals[i]->GetValue(src, dst);
 }
-inline void AggFnEvaluator::Finalize(const std::vector<AggFnEvaluator*>& 
evaluators,
-    const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst) {
-  DCHECK_EQ(evaluators.size(), fn_ctxs.size());
-  for (int i = 0; i < evaluators.size(); ++i) {
-    evaluators[i]->Finalize(fn_ctxs[i], src, dst);
+
+inline void AggFnEvaluator::Finalize(const std::vector<AggFnEvaluator*>& evals,
+    Tuple* agg_val, Tuple* output_val) {
+  for (int i = 0; i < evals.size(); ++i) {
+    evals[i]->Finalize(agg_val, output_val);
   }
 }
 
+
+
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/agg-fn.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/agg-fn.cc b/be/src/exprs/agg-fn.cc
new file mode 100644
index 0000000..6e1d75f
--- /dev/null
+++ b/be/src/exprs/agg-fn.cc
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/agg-fn.h"
+
+#include "codegen/llvm-codegen.h"
+#include "exprs/anyval-util.h"
+#include "exprs/scalar-expr.h"
+#include "runtime/descriptors.h"
+#include "runtime/lib-cache.h"
+
+#include "common/names.h"
+
+using namespace impala_udf;
+using namespace llvm;
+
+namespace impala {
+
+AggFn::AggFn(const TExprNode& tnode, const SlotDescriptor& 
intermediate_slot_desc,
+    const SlotDescriptor& output_slot_desc)
+  : Expr(tnode),
+    is_merge_(tnode.agg_expr.is_merge_agg),
+    intermediate_slot_desc_(intermediate_slot_desc),
+    output_slot_desc_(output_slot_desc),
+    arg_type_descs_(AnyValUtil::ColumnTypesToTypeDescs(
+        ColumnType::FromThrift(tnode.agg_expr.arg_types))) {
+  DCHECK(tnode.__isset.fn);
+  DCHECK(tnode.fn.__isset.aggregate_fn);
+  DCHECK_EQ(tnode.node_type, TExprNodeType::AGGREGATE_EXPR);
+  DCHECK_EQ(ColumnType::FromThrift(tnode.type).type,
+      ColumnType::FromThrift(fn_.ret_type).type);
+  const string& fn_name = fn_.name.function_name;
+  if (fn_name == "count") {
+    agg_op_ = COUNT;
+  } else if (fn_name == "min") {
+    agg_op_ = MIN;
+  } else if (fn_name == "max") {
+    agg_op_ = MAX;
+  } else if (fn_name == "sum") {
+    agg_op_ = SUM;
+  } else if (fn_name == "avg") {
+    agg_op_ = AVG;
+  } else if (fn_name == "ndv" || fn_name == "ndv_no_finalize") {
+    agg_op_ = NDV;
+  } else {
+    agg_op_ = OTHER;
+  }
+}
+
+Status AggFn::Init(const RowDescriptor& row_desc, RuntimeState* state) {
+  // Initialize all children (i.e. input exprs to this aggregate expr).
+  for (ScalarExpr* input_expr : children()) {
+    RETURN_IF_ERROR(input_expr->Init(row_desc, state));
+  }
+
+  // Initialize the aggregate expressions' internals.
+  const TAggregateFunction& aggregate_fn = fn_.aggregate_fn;
+  DCHECK_EQ(intermediate_slot_desc_.type().type,
+      ColumnType::FromThrift(aggregate_fn.intermediate_type).type);
+  DCHECK_EQ(output_slot_desc_.type().type, 
ColumnType::FromThrift(fn_.ret_type).type);
+
+  // Load the function pointers. Must have init() and update().
+  if (aggregate_fn.init_fn_symbol.empty() ||
+      aggregate_fn.update_fn_symbol.empty() ||
+      (aggregate_fn.merge_fn_symbol.empty() && 
!aggregate_fn.is_analytic_only_fn)) {
+    // This path is only for partially implemented builtins.
+    DCHECK_EQ(fn_.binary_type, TFunctionBinaryType::BUILTIN);
+    stringstream ss;
+    ss << "Function " << fn_.name.function_name << " is not implemented.";
+    return Status(ss.str());
+  }
+
+  RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
+      aggregate_fn.init_fn_symbol, &init_fn_, &cache_entry_));
+  RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
+      aggregate_fn.update_fn_symbol, &update_fn_, &cache_entry_));
+
+  // Merge() is not defined for purely analytic function.
+  if (!aggregate_fn.is_analytic_only_fn) {
+    RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
+        aggregate_fn.merge_fn_symbol, &merge_fn_, &cache_entry_));
+  }
+  // Serialize(), GetValue(), Remove() and Finalize() are optional
+  if (!aggregate_fn.serialize_fn_symbol.empty()) {
+    RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
+        aggregate_fn.serialize_fn_symbol, &serialize_fn_, &cache_entry_));
+  }
+  if (!aggregate_fn.get_value_fn_symbol.empty()) {
+    RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
+        aggregate_fn.get_value_fn_symbol, &get_value_fn_, &cache_entry_));
+  }
+  if (!aggregate_fn.remove_fn_symbol.empty()) {
+    RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
+        aggregate_fn.remove_fn_symbol, &remove_fn_, &cache_entry_));
+  }
+  if (!aggregate_fn.finalize_fn_symbol.empty()) {
+    RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
+        fn_.aggregate_fn.finalize_fn_symbol, &finalize_fn_, &cache_entry_));
+  }
+  return Status::OK();
+}
+
+Status AggFn::Create(const TExpr& texpr, const RowDescriptor& row_desc,
+    const SlotDescriptor& intermediate_slot_desc, const SlotDescriptor& 
output_slot_desc,
+    RuntimeState* state, AggFn** agg_fn) {
+  *agg_fn = nullptr;
+  ObjectPool* pool = state->obj_pool();
+  const TExprNode& texpr_node = texpr.nodes[0];
+  DCHECK_EQ(texpr_node.node_type, TExprNodeType::AGGREGATE_EXPR);
+  if (!texpr_node.__isset.fn) {
+    return Status("Function not set in thrift AGGREGATE_EXPR node");
+  }
+  AggFn* new_agg_fn =
+      pool->Add(new AggFn(texpr_node, intermediate_slot_desc, 
output_slot_desc));
+  RETURN_IF_ERROR(Expr::CreateTree(texpr, pool, new_agg_fn));
+  Status status = new_agg_fn->Init(row_desc, state);
+  if (UNLIKELY(!status.ok())) {
+    new_agg_fn->Close();
+    return status;
+  }
+  for (ScalarExpr* input_expr : new_agg_fn->children()) {
+    int fn_ctx_idx = 0;
+    input_expr->AssignFnCtxIdx(&fn_ctx_idx);
+  }
+  *agg_fn = new_agg_fn;
+  return Status::OK();
+}
+
+FunctionContext::TypeDesc AggFn::GetIntermediateTypeDesc() const {
+  return AnyValUtil::ColumnTypeToTypeDesc(intermediate_slot_desc_.type());
+}
+
+FunctionContext::TypeDesc AggFn::GetOutputTypeDesc() const {
+  return AnyValUtil::ColumnTypeToTypeDesc(output_slot_desc_.type());
+}
+
+Status AggFn::CodegenUpdateOrMergeFunction(LlvmCodeGen* codegen, Function** 
uda_fn) {
+  const string& symbol =
+      is_merge_ ? fn_.aggregate_fn.merge_fn_symbol : 
fn_.aggregate_fn.update_fn_symbol;
+  vector<ColumnType> fn_arg_types;
+  for (ScalarExpr* input_expr : children()) {
+    fn_arg_types.push_back(input_expr->type());
+  }
+  // The intermediate value is passed as the last argument.
+  fn_arg_types.push_back(intermediate_type());
+  RETURN_IF_ERROR(codegen->LoadFunction(fn_, symbol, nullptr, fn_arg_types,
+      fn_arg_types.size(), false, uda_fn, &cache_entry_));
+
+  // Inline constants into the function body (if there is an IR body).
+  if (!(*uda_fn)->isDeclaration()) {
+    // TODO: IMPALA-4785: we should also replace references to 
GetIntermediateType()
+    // with constants.
+    codegen->InlineConstFnAttrs(GetOutputTypeDesc(), arg_type_descs_, *uda_fn);
+    *uda_fn = codegen->FinalizeFunction(*uda_fn);
+    if (*uda_fn == nullptr) {
+      return Status(TErrorCode::UDF_VERIFY_FAILED, symbol, fn_.hdfs_location);
+    }
+  }
+  return Status::OK();
+}
+
+void AggFn::Close() {
+  // This also closes all the input expressions.
+  Expr::Close();
+}
+
+void AggFn::Close(const vector<AggFn*>& exprs) {
+  for (AggFn* expr : exprs) expr->Close();
+}
+
+string AggFn::DebugString() const {
+  stringstream out;
+  out << "AggFn(op=" << agg_op_;
+  for (ScalarExpr* input_expr : children()) {
+    out << " " << input_expr->DebugString() << ")";
+  }
+  out << ")";
+  return out.str();
+}
+
+string AggFn::DebugString(const vector<AggFn*>& agg_fns) {
+  stringstream out;
+  out << "[";
+  for (int i = 0; i < agg_fns.size(); ++i) {
+    out << (i == 0 ? "" : " ") << agg_fns[i]->DebugString();
+  }
+  out << "]";
+  return out.str();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/agg-fn.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/agg-fn.h b/be/src/exprs/agg-fn.h
new file mode 100644
index 0000000..9c8dbe1
--- /dev/null
+++ b/be/src/exprs/agg-fn.h
@@ -0,0 +1,180 @@
+ // Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_EXPRS_AGG_FN_H
+#define IMPALA_EXPRS_AGG_FN_H
+
+#include "exprs/expr.h"
+#include "runtime/descriptors.h"
+#include "udf/udf.h"
+
+namespace impala {
+
+using impala_udf::FunctionContext;
+
+class LlvmCodeGen;
+class MemPool;
+class MemTracker;
+class ObjectPool;
+class RuntimeState;
+class Tuple;
+class TupleRow;
+class TExprNode;
+
+/// --- AggFn overview
+///
+/// An aggregate function generates an output over a set of tuple rows.
+/// An example would be AVG() which computes the average of all input rows.
+/// The built-in aggregate functions such as min, max, sum, avg, ndv etc are
+/// in this category.
+///
+/// --- Implementation
+///
+/// AggFn contains the aggregation operations, pointers to the UDAF interface 
functions
+/// implementing various states of aggregation and the descriptors for the 
intermediate
+/// and output values. Please see udf/udf.h for details of the UDAF interfaces.
+///
+/// AggFnEvaluator is the interface for evaluating aggregate functions against 
input
+/// tuple rows. It invokes the following functions at different phases of the 
aggregation:
+///
+/// init_fn_     : An initialization function that initializes the aggregate 
value.
+///
+/// update_fn_   : An update function that processes the arguments for each 
row in the
+///                query result set and accumulates an intermediate result. 
For example,
+///                this function might increment a counter, append to a string 
buffer or
+///                add the input to a culmulative sum.
+///
+/// merge_fn_    : A merge function that combines multiple intermediate 
results into a
+///                single value.
+///
+/// serialize_fn_: A serialization function that flattens any intermediate 
values
+///                containing pointers, and frees any memory allocated during 
the init,
+///                update and merge phases.
+///
+/// finalize_fn_ : A finalize function that either passes through the combined 
result
+///                unchanged, or does one final transformation. Also frees the 
resources
+///                allocated during init, update and merge phases.
+///
+/// get_value_fn_: Used by AnalyticEval node to obtain the current 
intermediate value.
+///
+/// remove_fn_   : Used by AnalyticEval node to undo the update to the 
intermediate value
+///                by an input row as it falls out of a sliding window.
+///
+class AggFn : public Expr {
+ public:
+
+  /// Override the base class' implementation.
+  virtual bool IsAggFn() const { return true; }
+
+  /// Enum for some built-in aggregation ops.
+  enum AggregationOp {
+    COUNT,
+    MIN,
+    MAX,
+    SUM,
+    AVG,
+    NDV,
+    OTHER,
+  };
+
+  /// Creates and initializes an aggregate function from 'texpr' and returns 
it in
+  /// 'agg_fn'. The returned AggFn lives in the ObjectPool of 'state'. 
'row_desc' is
+  /// the row descriptor of the input tuple row; 'intermediate_slot_desc' is 
the slot
+  /// descriptor of the intermediate value; 'output_slot_desc' is the slot 
descriptor
+  /// of the output value. On failure, returns error status and sets 'agg_fn' 
to NULL.
+  static Status Create(const TExpr& texpr, const RowDescriptor& row_desc,
+      const SlotDescriptor& intermediate_slot_desc,
+      const SlotDescriptor& output_slot_desc, RuntimeState* state, AggFn** 
agg_fn)
+      WARN_UNUSED_RESULT;
+
+  bool is_merge() const { return is_merge_; }
+  AggregationOp agg_op() const { return agg_op_; }
+  bool is_count_star() const { return agg_op_ == COUNT && children_.empty(); }
+  bool is_builtin() const { return fn_.binary_type == 
TFunctionBinaryType::BUILTIN; }
+  const std::string& fn_name() const { return fn_.name.function_name; }
+  const ColumnType& intermediate_type() const { return 
intermediate_slot_desc_.type(); }
+  const SlotDescriptor& intermediate_slot_desc() const { return 
intermediate_slot_desc_; }
+  // Output type is the same as Expr::type().
+  const SlotDescriptor& output_slot_desc() const { return output_slot_desc_; }
+  void* remove_fn() const { return remove_fn_; }
+  void* merge_or_update_fn() const { return is_merge_ ? merge_fn_ : 
update_fn_; }
+  void* serialize_fn() const { return serialize_fn_; }
+  void* get_value_fn() const { return get_value_fn_; }
+  void* finalize_fn() const { return finalize_fn_; }
+  bool SupportsRemove() const { return remove_fn_ != nullptr; }
+  bool SupportsSerialize() const { return serialize_fn_ != nullptr; }
+  FunctionContext::TypeDesc GetIntermediateTypeDesc() const;
+  FunctionContext::TypeDesc GetOutputTypeDesc() const;
+  const std::vector<FunctionContext::TypeDesc>& arg_type_descs() const {
+    return arg_type_descs_;
+  }
+
+  /// Generates an IR wrapper function to call update_fn_/merge_fn_ which may 
either be
+  /// cross-compiled or loaded from an external library. The generated IR 
function is
+  /// returned in 'uda_fn'. Returns error status on failure.
+  /// TODO: implement codegen path for init, finalize, serialize functions etc.
+  Status CodegenUpdateOrMergeFunction(LlvmCodeGen* codegen, llvm::Function** 
uda_fn)
+      WARN_UNUSED_RESULT;
+
+  /// Releases all cache entries to libCache for all nodes in the expr tree.
+  virtual void Close();
+  static void Close(const std::vector<AggFn*>& exprs);
+
+  virtual std::string DebugString() const;
+  static std::string DebugString(const std::vector<AggFn*>& exprs);
+
+ private:
+  friend class Expr;
+  friend class AggFnEvaluator;
+
+  /// True if this is a merging aggregation.
+  const bool is_merge_;
+
+  /// Slot into which Update()/Merge()/Serialize() write their result. Not 
owned.
+  const SlotDescriptor& intermediate_slot_desc_;
+
+  /// Slot into which Finalize() results are written. Not owned. Identical to
+  /// intermediate_slot_desc_ if this agg fn has the same intermediate and 
result type.
+  const SlotDescriptor& output_slot_desc_;
+
+  /// The types of the arguments to the aggregate function.
+  const std::vector<FunctionContext::TypeDesc> arg_type_descs_;
+
+  /// The aggregation operation.
+  AggregationOp agg_op_;
+
+  /// Function pointers for the different phases of the aggregate function.
+  void* init_fn_ = nullptr;
+  void* update_fn_ = nullptr;
+  void* remove_fn_ = nullptr;
+  void* merge_fn_ = nullptr;
+  void* serialize_fn_ = nullptr;
+  void* get_value_fn_ = nullptr;
+  void* finalize_fn_ = nullptr;
+
+  AggFn(const TExprNode& node, const SlotDescriptor& intermediate_slot_desc,
+      const SlotDescriptor& output_slot_desc);
+
+  /// Initializes the AggFn and its input expressions. May load the UDAF from 
LibCache
+  /// if necessary.
+  virtual Status Init(const RowDescriptor& desc, RuntimeState* state) 
WARN_UNUSED_RESULT;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/aggregate-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/aggregate-functions-ir.cc 
b/be/src/exprs/aggregate-functions-ir.cc
index 74badf6..c7f0658 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -34,7 +34,6 @@
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
 #include "exprs/anyval-util.h"
-#include "exprs/expr.h"
 #include "exprs/hll-bias.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/aggregate-functions.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/aggregate-functions.h 
b/be/src/exprs/aggregate-functions.h
index c504468..5ebcb97 100644
--- a/be/src/exprs/aggregate-functions.h
+++ b/be/src/exprs/aggregate-functions.h
@@ -21,10 +21,21 @@
 
 #include "udf/udf-internal.h"
 
-using namespace impala_udf;
-
 namespace impala {
 
+using impala_udf::FunctionContext;
+using impala_udf::AnyVal;
+using impala_udf::BooleanVal;
+using impala_udf::TinyIntVal;
+using impala_udf::SmallIntVal;
+using impala_udf::IntVal;
+using impala_udf::BigIntVal;
+using impala_udf::FloatVal;
+using impala_udf::DoubleVal;
+using impala_udf::TimestampVal;
+using impala_udf::StringVal;
+using impala_udf::DecimalVal;
+
 /// Collection of builtin aggregate functions. Aggregate functions implement
 /// the various phases of the aggregation: Init(), Update(), Serialize(), 
Merge(),
 /// and Finalize(). Not all functions need to implement all of the steps and

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/anyval-util.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/anyval-util.h b/be/src/exprs/anyval-util.h
index dfcf1ec..a18786e 100644
--- a/be/src/exprs/anyval-util.h
+++ b/be/src/exprs/anyval-util.h
@@ -27,10 +27,21 @@
 #include "util/decimal-util.h"
 #include "util/hash-util.h"
 
-using namespace impala_udf;
-
 namespace impala {
 
+using impala_udf::FunctionContext;
+using impala_udf::AnyVal;
+using impala_udf::BooleanVal;
+using impala_udf::TinyIntVal;
+using impala_udf::SmallIntVal;
+using impala_udf::IntVal;
+using impala_udf::BigIntVal;
+using impala_udf::FloatVal;
+using impala_udf::DoubleVal;
+using impala_udf::TimestampVal;
+using impala_udf::StringVal;
+using impala_udf::DecimalVal;
+
 class ObjectPool;
 
 /// Utilities for AnyVals
@@ -274,7 +285,7 @@ class AnyValUtil {
               reinterpret_cast<StringVal*>(dst));
           if (type.type == TYPE_VARCHAR) {
             StringVal* sv = reinterpret_cast<StringVal*>(dst);
-            DCHECK(type.len >= 0);
+            DCHECK_GE(type.len, 0);
             DCHECK_LE(sv->len, type.len);
           }
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exprs/bit-byte-functions.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/bit-byte-functions.h 
b/be/src/exprs/bit-byte-functions.h
index 63d4de6..e000047 100644
--- a/be/src/exprs/bit-byte-functions.h
+++ b/be/src/exprs/bit-byte-functions.h
@@ -20,10 +20,14 @@
 
 #include "udf/udf.h"
 
-using namespace impala_udf;
-
 namespace impala {
 
+using impala_udf::FunctionContext;
+using impala_udf::TinyIntVal;
+using impala_udf::SmallIntVal;
+using impala_udf::IntVal;
+using impala_udf::BigIntVal;
+
 class BitByteFunctions {
  public:
   // Count number of bits set in binary representation of integer types (aka 
popcount)

Reply via email to